diff --git a/.toys/ci.rb b/.toys/ci.rb index ea26814..32e507a 100644 --- a/.toys/ci.rb +++ b/.toys/ci.rb @@ -123,7 +123,8 @@ def setup_auth_env "GCLOUD_TEST_PROJECT" => final_project, "GOOGLE_CLOUD_PROJECT" => final_project, "GCLOUD_TEST_KEYFILE" => final_keyfile, - "GOOGLE_APPLICATION_CREDENTIALS" => ENV["GOOGLE_APPLICATION_CREDENTIALS"] + "GOOGLE_APPLICATION_CREDENTIALS" => ENV["GOOGLE_APPLICATION_CREDENTIALS"], + "MT_COMPAT" => "true" } end diff --git a/google-cloud-spanner/acceptance/spanner/client/batch_update_test.rb b/google-cloud-spanner/acceptance/spanner/client/batch_update_test.rb index b85140d..b3cbf37 100644 --- a/google-cloud-spanner/acceptance/spanner/client/batch_update_test.rb +++ b/google-cloud-spanner/acceptance/spanner/client/batch_update_test.rb @@ -80,7 +80,7 @@ _(prior_results.rows.count).must_equal 3 timestamp = db[dialect].transaction do |tx| - _(tx.transaction_id).wont_be :nil? + _(tx.no_existing_transaction?).must_equal true row_counts = tx.batch_update do |b| b.batch_update insert_dml[dialect], params: insert_params[dialect] @@ -107,7 +107,7 @@ _(prior_results.rows.count).must_equal 3 timestamp = db[dialect].transaction do |tx| - _(tx.transaction_id).wont_be :nil? + _(tx.no_existing_transaction?).must_equal true err = expect do tx.batch_update { |b| } # rubocop:disable Lint/EmptyBlock @@ -124,7 +124,7 @@ _(prior_results.rows.count).must_equal 3 timestamp = db[dialect].transaction do |tx| - _(tx.transaction_id).wont_be :nil? + _(tx.no_existing_transaction?).must_equal true begin tx.batch_update do |b| b.batch_update insert_dml[dialect], params: insert_params[dialect] @@ -148,12 +148,31 @@ _(timestamp).must_be_kind_of Time end + it "raises BatchUpdateError when the first statement in Batch DML is a syntax error for #{dialect}" do + prior_results = db[dialect].execute_sql "SELECT * FROM accounts" + _(prior_results.rows.count).must_equal 3 + db[dialect].transaction do |tx| + begin + _(tx.no_existing_transaction?).must_equal true + tx.batch_update do |b| + b.batch_update update_dml_syntax_error, params: update_params[dialect] + end + rescue Google::Cloud::Spanner::BatchUpdateError => e + _(e.cause).must_be_kind_of Google::Cloud::InvalidArgumentError + _(e.cause.message).must_equal "Statement 0: 'UPDDDD accounts' is not valid DML." + end + _(tx.no_existing_transaction?).must_equal true + end + prior_results = db[dialect].execute_sql "SELECT * FROM accounts" + _(prior_results.rows.count).must_equal 3 + end + it "runs execute_update and batch_update in the same transaction for #{dialect}" do prior_results = db[dialect].execute_sql "SELECT * FROM accounts" _(prior_results.rows.count).must_equal 3 timestamp = db[dialect].transaction do |tx| - _(tx.transaction_id).wont_be :nil? + _(tx.no_existing_transaction?).must_equal true row_counts = tx.batch_update do |b| b.batch_update insert_dml[dialect], params: insert_params[dialect] diff --git a/google-cloud-spanner/acceptance/spanner/client/dml_test.rb b/google-cloud-spanner/acceptance/spanner/client/dml_test.rb index d83a51b..a019953 100644 --- a/google-cloud-spanner/acceptance/spanner/client/dml_test.rb +++ b/google-cloud-spanner/acceptance/spanner/client/dml_test.rb @@ -71,7 +71,7 @@ _(prior_results.rows.count).must_equal 3 timestamp = db[dialect].transaction do |tx| - _(tx.transaction_id).wont_be :nil? + _(tx.no_existing_transaction?).must_equal true # Execute a DML using execute_update and make sure data is updated and correct count is returned. insert_row_count = tx.execute_update \ @@ -112,7 +112,7 @@ _(prior_results.rows.count).must_equal 3 timestamp = db[dialect].transaction do |tx| - _(tx.transaction_id).wont_be :nil? + _(tx.no_existing_transaction?).must_equal true # Execute a DML using execute_update and make sure data is updated and correct count is returned. insert_row_count = tx.execute_update \ @@ -141,7 +141,7 @@ _(prior_results.rows.count).must_equal 3 timestamp = db[dialect].transaction do |tx| - _(tx.transaction_id).wont_be :nil? + _(tx.no_existing_transaction?).must_equal true # Execute a DML statement, followed by calling existing insert method, # commit the transaction and assert that both the updates are present. diff --git a/google-cloud-spanner/acceptance/spanner/client/transaction_test.rb b/google-cloud-spanner/acceptance/spanner/client/transaction_test.rb index c594860..e230d42 100644 --- a/google-cloud-spanner/acceptance/spanner/client/transaction_test.rb +++ b/google-cloud-spanner/acceptance/spanner/client/transaction_test.rb @@ -35,9 +35,35 @@ db.delete "accounts" end + it "runs basic inline begin transaction" do + db.transaction do |tx| + _(tx.no_existing_transaction?).must_equal true + tx_results = tx.execute_query "SELECT * from accounts" + _(tx.transaction_id).wont_be :nil? + _(tx_results.rows.count).must_equal default_account_rows.length + end + end + + it "re-uses existing transaction for multiple queries" do + db.transaction do |tx| + _(tx.no_existing_transaction?).must_equal true + + tx_results = tx.execute_query "SELECT * from accounts" + tx_id_1 = tx.transaction_id + _(tx_id_1).wont_be :nil? + _(tx_results.rows.count).must_equal default_account_rows.length + + tx_results_2 = tx.execute_query "SELECT * from accounts WHERE active = true" + tx_id_2 = tx.transaction_id + _(tx_id_2).wont_be :nil? + _(tx_id_1).must_equal tx_id_2 + _(tx_results_2.rows.count).must_equal 2 + end + end + it "modifies accounts and verifies data with reads" do timestamp = db.transaction do |tx| - _(tx.transaction_id).wont_be :nil? + _(tx.no_existing_transaction?).must_equal true tx_results = tx.read "accounts", columns _(tx_results).must_be_kind_of Google::Cloud::Spanner::Results @@ -70,7 +96,7 @@ it "can rollback a transaction without passing on using Rollback" do timestamp = db.transaction do |tx| - _(tx.transaction_id).wont_be :nil? + _(tx.no_existing_transaction?).must_equal true tx_results = tx.read "accounts", columns _(tx_results).must_be_kind_of Google::Cloud::Spanner::Results @@ -101,7 +127,7 @@ it "can rollback a transaction and pass on the error" do assert_raises ZeroDivisionError do db.transaction do |tx| - _(tx.transaction_id).wont_be :nil? + _(tx.no_existing_transaction?).must_equal true tx_results = tx.read "accounts", columns _(tx_results).must_be_kind_of Google::Cloud::Spanner::Results @@ -220,7 +246,7 @@ commit_options = { return_commit_stats: true } commit_resp = db.transaction commit_options: commit_options do |tx| - _(tx.transaction_id).wont_be :nil? + _(tx.no_existing_transaction?).must_equal true tx.insert "accounts", additional_account end diff --git a/google-cloud-spanner/lib/google/cloud/spanner/batch_update_results.rb b/google-cloud-spanner/lib/google/cloud/spanner/batch_update_results.rb new file mode 100644 index 0000000..6467eae --- /dev/null +++ b/google-cloud-spanner/lib/google/cloud/spanner/batch_update_results.rb @@ -0,0 +1,50 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +module Google + module Cloud + module Spanner + ## + # @private Helper class to process BatchDML response + class BatchUpdateResults + ## Object of type + # Google::Cloud::Spanner::V1::ExecuteBatchDmlResponse + attr_reader :grpc + + def initialize grpc + @grpc = grpc + end + + def row_counts + if @grpc.status.code.zero? + @grpc.result_sets.map { |rs| rs.stats.row_count_exact } + else + begin + raise Google::Cloud::Error.from_error @grpc.status + rescue Google::Cloud::Error + raise Google::Cloud::Spanner::BatchUpdateError.from_grpc @grpc + end + end + end + + ## + # Returns transaction if available. Otherwise returns nil + def transaction + @grpc&.result_sets&.first&.metadata&.transaction + end + end + end + end +end diff --git a/google-cloud-spanner/lib/google/cloud/spanner/client.rb b/google-cloud-spanner/lib/google/cloud/spanner/client.rb index ece30cb..3092904 100644 --- a/google-cloud-spanner/lib/google/cloud/spanner/client.rb +++ b/google-cloud-spanner/lib/google/cloud/spanner/client.rb @@ -49,6 +49,10 @@ module Spanner # end # class Client + ## + # @private + IS_TRANSACTION_RUNNING_KEY = "ruby_spanner_is_transaction_running".freeze + ## # @private Creates a new Spanner Client instance. def initialize project, instance_id, database_id, session_labels: nil, @@ -1634,6 +1638,7 @@ def commit commit_options: nil, request_options: nil, # rubocop:disable Metrics/MethodLength # rubocop:disable Metrics/BlockLength + ## # Creates a transaction for reads and writes that execute atomically at # a single logical point in time across columns, rows, and tables in a @@ -1788,7 +1793,7 @@ def commit commit_options: nil, request_options: nil, def transaction deadline: 120, commit_options: nil, request_options: nil, call_options: nil ensure_service! - unless Thread.current[:transaction_id].nil? + unless Thread.current[IS_TRANSACTION_RUNNING_KEY].nil? raise "Nested transactions are not allowed" end @@ -1799,17 +1804,20 @@ def transaction deadline: 120, commit_options: nil, request_options = Convert.to_request_options \ request_options, tag_type: :transaction_tag - @pool.with_transaction do |tx| + @pool.with_session do |session| + tx = session.create_empty_transaction if request_options tx.transaction_tag = request_options[:transaction_tag] end begin - Thread.current[:transaction_id] = tx.transaction_id + Thread.current[IS_TRANSACTION_RUNNING_KEY] = true yield tx + transaction_id = nil + transaction_id = tx.transaction_id if tx.existing_transaction? commit_resp = @project.service.commit \ tx.session.path, tx.mutations, - transaction_id: tx.transaction_id, + transaction_id: transaction_id, commit_options: commit_options, request_options: request_options, call_options: call_options @@ -1819,28 +1827,21 @@ def transaction deadline: 120, commit_options: nil, Google::Cloud::AbortedError, GRPC::Internal, Google::Cloud::InternalError => e - raise e if internal_error_and_not_retryable? e - # Re-raise if deadline has passed - if current_time - start_time > deadline - if e.is_a? GRPC::BadStatus - e = Google::Cloud::Error.from_error e - end - raise e - end + check_and_propagate_err! e, (current_time - start_time > deadline) # Sleep the amount from RetryDelay, or incremental backoff sleep(delay_from_aborted(e) || backoff *= 1.3) # Create new transaction on the session and retry the block - tx = tx.session.create_transaction + tx = session.create_transaction retry rescue StandardError => e # Rollback transaction when handling unexpected error - tx.session.rollback tx.transaction_id + tx.session.rollback tx.transaction_id if tx.existing_transaction? # Return nil if raised with rollback. return nil if e.is_a? Rollback # Re-raise error. raise e ensure - Thread.current[:transaction_id] = nil + Thread.current[IS_TRANSACTION_RUNNING_KEY] = nil end end end @@ -1923,7 +1924,7 @@ def snapshot strong: nil, timestamp: nil, read_timestamp: nil, exact_staleness: exact_staleness ensure_service! - unless Thread.current[:transaction_id].nil? + unless Thread.current[IS_TRANSACTION_RUNNING_KEY].nil? raise "Nested snapshots are not allowed" end @@ -1933,11 +1934,11 @@ def snapshot strong: nil, timestamp: nil, read_timestamp: nil, timestamp: (timestamp || read_timestamp), staleness: (staleness || exact_staleness), call_options: call_options - Thread.current[:transaction_id] = snp_grpc.id + Thread.current[IS_TRANSACTION_RUNNING_KEY] = true snp = Snapshot.from_grpc snp_grpc, session yield snp if block_given? ensure - Thread.current[:transaction_id] = nil + Thread.current[IS_TRANSACTION_RUNNING_KEY] = nil end nil end @@ -2272,6 +2273,18 @@ def delay_from_aborted err nil end + ## + # Determines if a transaction error should be propagated to the user. + # And re-raises the error accordingly + def check_and_propagate_err! err, deadline_passed + raise err if internal_error_and_not_retryable? err + return unless deadline_passed + if err.is_a? GRPC::BadStatus + raise Google::Cloud::Error.from_error err + end + raise err + end + def internal_error_and_not_retryable? error (error.instance_of?(Google::Cloud::InternalError) || error.instance_of?(GRPC::Internal)) && diff --git a/google-cloud-spanner/lib/google/cloud/spanner/pool.rb b/google-cloud-spanner/lib/google/cloud/spanner/pool.rb index 597dcdf..dc31393 100644 --- a/google-cloud-spanner/lib/google/cloud/spanner/pool.rb +++ b/google-cloud-spanner/lib/google/cloud/spanner/pool.rb @@ -29,19 +29,15 @@ module Spanner # {Google::Cloud::Spanner::Session} instances. # class Pool - attr_accessor :all_sessions - attr_accessor :session_stack - attr_accessor :transaction_stack + attr_accessor :sessions_available + attr_accessor :sessions_in_use def initialize client, min: 10, max: 100, keepalive: 1800, - write_ratio: 0.3, fail: true, threads: nil + fail: true, threads: nil @client = client @min = min @max = max @keepalive = keepalive - @write_ratio = write_ratio - @write_ratio = 0 if write_ratio.negative? - @write_ratio = 1 if write_ratio > 1 @fail = fail @threads = threads || [2, Concurrent.processor_count * 2].max @@ -69,10 +65,11 @@ def checkout_session # Use LIFO to ensure sessions are used from backend caches, which # will reduce the read / write latencies on user requests. - read_session = session_stack.pop # LIFO - return read_session if read_session - write_transaction = transaction_stack.pop # LIFO - return write_transaction.session if write_transaction + read_session = sessions_available.pop # LIFO + if read_session + sessions_in_use << read_session + return read_session + end if can_allocate_more_sessions? action = :new @@ -85,73 +82,25 @@ def checkout_session end end - return new_session! if action == :new - end - - def checkin_session session - @mutex.synchronize do - unless all_sessions.include? session - raise ArgumentError, "Cannot checkin session" + if action == :new + session = new_session! + @mutex.synchronize do + sessions_in_use << session end - - session_stack.push session - - @resource.signal + return session end nil end - def with_transaction - tx = checkout_transaction - begin - yield tx - ensure - future do - # Create and checkin a new transaction - tx = tx.session.create_transaction - checkin_transaction tx - end - end - end - - def checkout_transaction - action = nil - @mutex.synchronize do - loop do - raise ClientClosedError if @closed - - write_transaction = transaction_stack.pop # LIFO - return write_transaction if write_transaction - read_session = session_stack.pop - if read_session - action = read_session - break - end - - if can_allocate_more_sessions? - action = :new - break - end - - raise SessionLimitError if @fail - - @resource.wait @mutex - end - end - if action.is_a? Google::Cloud::Spanner::Session - return action.create_transaction - end - return new_transaction! if action == :new - end - - def checkin_transaction txn + def checkin_session session @mutex.synchronize do - unless all_sessions.include? txn.session + unless sessions_in_use.include? session raise ArgumentError, "Cannot checkin session" end - transaction_stack.push txn + sessions_available.push session + sessions_in_use.delete_if { |s| s.session_id == session.session_id } @resource.signal end @@ -182,22 +131,20 @@ def keepalive_or_release! to_release = [] @mutex.synchronize do - available_count = session_stack.count + transaction_stack.count + available_count = sessions_available.count release_count = @min - available_count release_count = 0 if release_count.negative? - to_keepalive += (session_stack + transaction_stack).select do |x| + to_keepalive += sessions_available.select do |x| x.idle_since? @keepalive end - # Remove a random portion of the sessions and transactions + # Remove a random portion of the sessions to_release = to_keepalive.sample release_count to_keepalive -= to_release # Remove those to be released from circulation - @all_sessions -= to_release.map(&:session) - @session_stack -= to_release - @transaction_stack -= to_release + @sessions_available -= to_release end to_release.each { |x| future { x.release! } } @@ -212,19 +159,11 @@ def init max_threads: @threads # init the stacks @new_sessions_in_process = 0 - @transaction_stack = [] # init the keepalive task create_keepalive_task! # init session stack - @all_sessions = @client.batch_create_new_sessions @min - sessions = @all_sessions.dup - num_transactions = (@min * @write_ratio).round - pending_transactions = sessions.shift num_transactions - # init transaction stack - pending_transactions.each do |transaction| - future { checkin_transaction transaction.create_transaction } - end - @session_stack = sessions + @sessions_available = @client.batch_create_new_sessions @min + @sessions_in_use = [] end def shutdown @@ -236,10 +175,10 @@ def shutdown @resource.broadcast # Delete all sessions @mutex.synchronize do - @all_sessions.each { |s| future { s.release! } } - @all_sessions = [] - @session_stack = [] - @transaction_stack = [] + sessions_available.each { |s| future { s.release! } } + sessions_in_use.each { |s| future { s.release! } } + @sessions_available = [] + @sessions_in_use = [] end # shutdown existing thread pool @thread_pool.shutdown @@ -261,19 +200,14 @@ def new_session! @mutex.synchronize do @new_sessions_in_process -= 1 - all_sessions << session end session end - def new_transaction! - new_session!.create_transaction - end - def can_allocate_more_sessions? # This is expected to be called from within a synchronize block - all_sessions.size + @new_sessions_in_process < @max + sessions_available.size + sessions_in_use.size + @new_sessions_in_process < @max end def create_keepalive_task! diff --git a/google-cloud-spanner/lib/google/cloud/spanner/project.rb b/google-cloud-spanner/lib/google/cloud/spanner/project.rb index c917e37..00728c2 100644 --- a/google-cloud-spanner/lib/google/cloud/spanner/project.rb +++ b/google-cloud-spanner/lib/google/cloud/spanner/project.rb @@ -515,17 +515,14 @@ def create_database instance_id, database_id, statements: [], # before an attempt is made to prevent the idle sessions from being # closed by the Cloud Spanner service. The default is 1800 (30 # minutes). - # * `:write_ratio` (Float) The ratio of sessions with pre-allocated - # transactions to those without. Pre-allocating transactions - # improves the performance of writes made by the client. The higher - # the value, the more transactions are pre-allocated. The value must - # be >= 0 and <= 1. The default is 0.3. # * `:fail` (true/false) When `true` the client raises a # {SessionLimitError} when the client has allocated the `max` number # of sessions. When `false` the client blocks until a session # becomes available. The default is `true`. # * `:threads` (Integer) The number of threads in the thread pool. The # default is twice the number of available CPUs. + # * `:write_ratio` (Float) Deprecated. This field is no longer needed + # and will be removed in a future release. # @param [Hash] labels The labels to be applied to all sessions # created by the client. Cloud Labels are a flexible and lightweight # mechanism for organizing cloud resources into groups that reflect a @@ -674,8 +671,7 @@ def database_path instance_id, database_id def valid_session_pool_options opts = {} { min: opts[:min], max: opts[:max], keepalive: opts[:keepalive], - write_ratio: opts[:write_ratio], fail: opts[:fail], - threads: opts[:threads] + fail: opts[:fail], threads: opts[:threads] }.compact end end diff --git a/google-cloud-spanner/lib/google/cloud/spanner/results.rb b/google-cloud-spanner/lib/google/cloud/spanner/results.rb index 7c99e76..2d9e38c 100644 --- a/google-cloud-spanner/lib/google/cloud/spanner/results.rb +++ b/google-cloud-spanner/lib/google/cloud/spanner/results.rb @@ -41,6 +41,11 @@ module Spanner # end # class Results + ## + # @private Object of type + # Google::Cloud::Spanner::V1::ResultSetMetadata + attr_reader :metadata + ## # The read timestamp chosen for single-use snapshots (read-only # transactions). @@ -73,6 +78,13 @@ def fields @fields ||= Fields.from_grpc @metadata.row_type.fields end + ## + # @private + # Returns a transaction if available + def transaction + @metadata&.transaction + end + # rubocop:disable all ## diff --git a/google-cloud-spanner/lib/google/cloud/spanner/service.rb b/google-cloud-spanner/lib/google/cloud/spanner/service.rb index 3ead6eb..82dfaa1 100644 --- a/google-cloud-spanner/lib/google/cloud/spanner/service.rb +++ b/google-cloud-spanner/lib/google/cloud/spanner/service.rb @@ -360,17 +360,7 @@ def execute_batch_dml session_name, transaction, statements, seqno, seqno: seqno, request_options: request_options } - results = service.execute_batch_dml request, opts - - if results.status.code.zero? - results.result_sets.map { |rs| rs.stats.row_count_exact } - else - begin - raise Google::Cloud::Error.from_error results.status - rescue Google::Cloud::Error - raise Google::Cloud::Spanner::BatchUpdateError.from_grpc results - end - end + service.execute_batch_dml request, opts end def streaming_read_table session_name, table_name, columns, keys: nil, diff --git a/google-cloud-spanner/lib/google/cloud/spanner/session.rb b/google-cloud-spanner/lib/google/cloud/spanner/session.rb index 62527a3..718cfc9 100644 --- a/google-cloud-spanner/lib/google/cloud/spanner/session.rb +++ b/google-cloud-spanner/lib/google/cloud/spanner/session.rb @@ -1173,6 +1173,14 @@ def create_transaction Transaction.from_grpc tx_grpc, self end + ## + # @private + # Creates a new transaction object without the grpc object + # within it. Use it for inline-begin of a transaction. + def create_empty_transaction + Transaction.from_grpc nil, self + end + ## # Reloads the session resource. Useful for determining if the session is # still valid on the Spanner API. diff --git a/google-cloud-spanner/lib/google/cloud/spanner/transaction.rb b/google-cloud-spanner/lib/google/cloud/spanner/transaction.rb index 8d46b63..abc3791 100644 --- a/google-cloud-spanner/lib/google/cloud/spanner/transaction.rb +++ b/google-cloud-spanner/lib/google/cloud/spanner/transaction.rb @@ -17,6 +17,7 @@ require "google/cloud/spanner/convert" require "google/cloud/spanner/results" require "google/cloud/spanner/commit" +require "google/cloud/spanner/batch_update_results" module Google module Cloud @@ -74,6 +75,9 @@ module Spanner # end # class Transaction + # @private The `Google::Cloud::Spanner::V1::Transaction` object. + attr_reader :grpc + # @private The Session object. attr_accessor :session @@ -83,13 +87,31 @@ class Transaction def initialize @commit = Commit.new @seqno = 0 + + # Mutex to enfore thread safety for transaction creation and query executions. + # + # This mutex protects two things: + # (1) the generation of sequence numbers + # (2) the creation of transactions. + # + # Specifically, @seqno is protected so it always reflects the last sequence number + # generated and provided to an operation in any thread. Any acquisition of a + # sequence number must be synchronized. + # + # Furthermore, @grpc is protected such that it either is nil if the + # transaction has not yet been created, or references the transaction + # resource if the transaction has been created. Any operation that could + # create a transaction must be synchronized, and any logic that depends on + # the state of transaction creation must also be synchronized. + @mutex = Mutex.new end ## # Identifier of the transaction results were run in. # @return [String] The transaction id. def transaction_id - return nil if @grpc.nil? + return @grpc.id if existing_transaction? + safe_begin_transaction @grpc.id end @@ -337,15 +359,18 @@ def execute_query sql, params: nil, types: nil, query_options: nil, request_options: nil, call_options: nil ensure_session! - @seqno += 1 - params, types = Convert.to_input_params_and_types params, types request_options = build_request_options request_options - session.execute_query sql, params: params, types: types, - transaction: tx_selector, seqno: @seqno, - query_options: query_options, - request_options: request_options, - call_options: call_options + + safe_execute do |seqno| + results = session.execute_query sql, params: params, types: types, + transaction: tx_selector, seqno: seqno, + query_options: query_options, + request_options: request_options, + call_options: call_options + @grpc ||= results.transaction + results + end end alias execute execute_query alias query execute_query @@ -612,12 +637,24 @@ def execute_update sql, params: nil, types: nil, query_options: nil, # def batch_update request_options: nil, call_options: nil, &block ensure_session! - @seqno += 1 request_options = build_request_options request_options - session.batch_update tx_selector, @seqno, - request_options: request_options, - call_options: call_options, &block + safe_execute do |seqno| + batch_update_results = nil + begin + response = session.batch_update tx_selector, seqno, + request_options: request_options, + call_options: call_options, &block + batch_update_results = BatchUpdateResults.new response + row_counts = batch_update_results.row_counts + @grpc ||= batch_update_results.transaction + return row_counts + rescue Google::Cloud::Spanner::BatchUpdateError + @grpc ||= batch_update_results.transaction + # Re-raise after extracting transaction + raise + end + end end ## @@ -686,10 +723,15 @@ def read table, columns, keys: nil, index: nil, limit: nil, columns = Array(columns).map(&:to_s) keys = Convert.to_key_set keys request_options = build_request_options request_options - session.read table, columns, keys: keys, index: index, limit: limit, - transaction: tx_selector, - request_options: request_options, - call_options: call_options + + safe_execute do + results = session.read table, columns, keys: keys, index: index, limit: limit, + transaction: tx_selector, + request_options: request_options, + call_options: call_options + @grpc ||= results.transaction + results + end end ## @@ -1111,12 +1153,69 @@ def self.from_grpc grpc, session end end + ## + # @private Checks if a transaction is already created. + def existing_transaction? + !no_existing_transaction? + end + + ## + # @private Checks if transaction is not already created. + def no_existing_transaction? + @grpc.nil? + end + protected - # The TransactionSelector to be used for queries + ## + # @private Facilitates a thread-safe execution of an rpc + # for inline-begin of a transaction. This method is optimised to + # use mutexes only when necessary, while still acheiving thread-safety. + # + # Note: Do not use @seqno directly while using this method. Instead, use + # the seqno variable passed to the block. + def safe_execute + loop do + if existing_transaction? + # Create a local copy of @seqno to avoid concurrent + # operations overriding the incremented value. + seqno = safe_next_seqno + # If a transaction already exists, execute rpc without mutex + return yield seqno + end + + @mutex.synchronize do + next if existing_transaction? + @seqno += 1 + return yield @seqno + end + end + end + + ## + # Create a new transaction in a thread-safe manner. + def safe_begin_transaction + @mutex.synchronize do + return if existing_transaction? + ensure_session! + @grpc = service.begin_transaction session.path + end + end + + ## + # @private The TransactionSelector to be used for queries. This method must + # be called from within a synchronized block, since the value returned + # depends on the state of @grpc field. + # + # This method is expected to be called from within `safe_execute()` method's block, + # since it provides synchronization and gurantees thread safety. def tx_selector - return nil if transaction_id.nil? - V1::TransactionSelector.new id: transaction_id + return V1::TransactionSelector.new id: transaction_id if existing_transaction? + V1::TransactionSelector.new( + begin: V1::TransactionOptions.new( + read_write: V1::TransactionOptions::ReadWrite.new + ) + ) end ## @@ -1133,6 +1232,15 @@ def build_request_options options options end + ## + # @private Generates the next seqno in a thread-safe manner. + def safe_next_seqno + @mutex.synchronize do + @seqno += 1 + return @seqno + end + end + ## # @private Raise an error unless an active connection to the service is # available. diff --git a/google-cloud-spanner/test/google/cloud/spanner/client/close_test.rb b/google-cloud-spanner/test/google/cloud/spanner/client/close_test.rb index cbf7ae2..7832f6c 100644 --- a/google-cloud-spanner/test/google/cloud/spanner/client/close_test.rb +++ b/google-cloud-spanner/test/google/cloud/spanner/client/close_test.rb @@ -30,8 +30,8 @@ before do session.instance_variable_set :@last_updated_at, Time.now p = client.instance_variable_get :@pool - p.all_sessions = [session] - p.session_stack = [session] + p.sessions_available = [session] + p.sessions_in_use = [] end it "deletes sessions when closed" do diff --git a/google-cloud-spanner/test/google/cloud/spanner/client/transaction_retry_test.rb b/google-cloud-spanner/test/google/cloud/spanner/client/transaction_retry_test.rb index 774a2b9..48fdf16 100644 --- a/google-cloud-spanner/test/google/cloud/spanner/client/transaction_retry_test.rb +++ b/google-cloud-spanner/test/google/cloud/spanner/client/transaction_retry_test.rb @@ -25,6 +25,13 @@ let(:transaction_grpc) { Google::Cloud::Spanner::V1::Transaction.new id: transaction_id } let(:transaction) { Google::Cloud::Spanner::Transaction.from_grpc transaction_grpc, session } let(:tx_selector) { Google::Cloud::Spanner::V1::TransactionSelector.new id: transaction_id } + let(:tx_selector_begin) do + Google::Cloud::Spanner::V1::TransactionSelector.new( + begin: Google::Cloud::Spanner::V1::TransactionOptions.new( + read_write: Google::Cloud::Spanner::V1::TransactionOptions::ReadWrite.new + ) + ) + end let(:default_options) { ::Gapic::CallOptions.new metadata: { "google-cloud-resource-prefix" => database_path(instance_id, database_id) } } let :results_hash do { @@ -42,7 +49,8 @@ { name: "project_ids", type: { code: :ARRAY, array_element_type: { code: :INT64 } } } ] - } + }, + transaction: { id: transaction_id }, }, values: [ { string_value: "1" }, @@ -77,21 +85,13 @@ mock = Minitest::Mock.new spanner.service.mocked_service = mock mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector, seqno: 1, options: default_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector_begin, seqno: 1, options: default_options mock.expect :begin_transaction, transaction_grpc, [{ session: session_grpc.name, options: tx_opts, request_options: nil }, default_options] expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector, seqno: 1, options: default_options - # transaction checkin - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] - def mock.commit *args # first time called this will raise if @called == nil @@ -137,21 +137,13 @@ def mock.commit *args mock = Minitest::Mock.new spanner.service.mocked_service = mock mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector, seqno: 1, options: default_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector_begin, seqno: 1, options: default_options mock.expect :begin_transaction, transaction_grpc, [{ session: session_grpc.name, options: tx_opts, request_options: nil }, default_options] expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector, seqno: 1, options: default_options - # transaction checkin - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] - def mock.commit *args # first time called this will raise if @called == nil @@ -195,21 +187,13 @@ def mock.commit *args mock = Minitest::Mock.new spanner.service.mocked_service = mock mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector, seqno: 1, options: default_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector_begin, seqno: 1, options: default_options mock.expect :begin_transaction, transaction_grpc, [{ session: session_grpc.name, options: tx_opts, request_options: nil }, default_options] expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector, seqno: 1, options: default_options - # transaction checkin - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] - def mock.commit *args # first time called this will raise if @called == nil @@ -253,10 +237,7 @@ def mock.commit *args mock = Minitest::Mock.new spanner.service.mocked_service = mock mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector, seqno: 1, options: default_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector_begin, seqno: 1, options: default_options mock.expect :begin_transaction, transaction_grpc, [{ session: session_grpc.name, options: tx_opts, request_options: nil @@ -268,11 +249,6 @@ def mock.commit *args }, default_options] expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector, seqno: 1, options: default_options - # transaction checkin - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] - def mock.commit *args # first time called this will raise if @called == nil @@ -321,10 +297,7 @@ def mock.commit *args mock = Minitest::Mock.new spanner.service.mocked_service = mock mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector, seqno: 1, options: default_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector_begin, seqno: 1, options: default_options mock.expect :begin_transaction, transaction_grpc, [{ session: session_grpc.name, options: tx_opts, request_options: nil @@ -346,11 +319,6 @@ def mock.commit *args }, default_options] expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector, seqno: 1, options: default_options - # transaction checkin - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] - def mock.commit *args raise GRPC::Aborted.new "aborted" end @@ -401,21 +369,13 @@ def mock.commit *args mock = Minitest::Mock.new spanner.service.mocked_service = mock mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector, seqno: 1, options: default_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector_begin, seqno: 1, options: default_options mock.expect :begin_transaction, transaction_grpc, [{ session: session_grpc.name, options: tx_opts, request_options: nil }, default_options] expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector, seqno: 1, options: default_options - # transaction checkin - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] - def mock.commit *args # first time called this will raise if @called == nil @@ -459,10 +419,6 @@ def mock.commit *args mock = Minitest::Mock.new spanner.service.mocked_service = mock mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] - def mock.commit *args # first time called this will raise diff --git a/google-cloud-spanner/test/google/cloud/spanner/client/transaction_rollback_test.rb b/google-cloud-spanner/test/google/cloud/spanner/client/transaction_rollback_test.rb index c8bf48c..b507548 100644 --- a/google-cloud-spanner/test/google/cloud/spanner/client/transaction_rollback_test.rb +++ b/google-cloud-spanner/test/google/cloud/spanner/client/transaction_rollback_test.rb @@ -24,6 +24,13 @@ let(:transaction_grpc) { Google::Cloud::Spanner::V1::Transaction.new id: transaction_id } let(:transaction) { Google::Cloud::Spanner::Transaction.from_grpc transaction_grpc, session } let(:tx_selector) { Google::Cloud::Spanner::V1::TransactionSelector.new id: transaction_id } + let(:tx_selector_begin) do + Google::Cloud::Spanner::V1::TransactionSelector.new( + begin: Google::Cloud::Spanner::V1::TransactionOptions.new( + read_write: Google::Cloud::Spanner::V1::TransactionOptions::ReadWrite.new + ) + ) + end let(:default_options) { ::Gapic::CallOptions.new metadata: { "google-cloud-resource-prefix" => database_path(instance_id, database_id) } } let :results_hash do { @@ -41,7 +48,8 @@ { name: "project_ids", type: { code: :ARRAY, array_element_type: { code: :INT64 } } } ] - } + }, + transaction: { id: transaction_id }, }, values: [ { string_value: "1" }, @@ -67,15 +75,8 @@ mock = Minitest::Mock.new spanner.service.mocked_service = mock mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector, seqno: 1, options: default_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector_begin, seqno: 1, options: default_options mock.expect :rollback, nil, [{ session: session_grpc.name, transaction_id: transaction_id }, default_options] - # transaction checkin - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] results = nil timestamp = client.transaction do |tx| @@ -99,15 +100,8 @@ mock = Minitest::Mock.new spanner.service.mocked_service = mock mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector, seqno: 1, options: default_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector_begin, seqno: 1, options: default_options mock.expect :rollback, nil, [{ session: session_grpc.name, transaction_id: transaction_id }, default_options] - # transaction checkin - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] results = nil assert_raises ZeroDivisionError do @@ -131,18 +125,11 @@ it "does not allow nested transactions" do mock = Minitest::Mock.new mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] - mock.expect :rollback, nil, [{ session: session_grpc.name, transaction_id: transaction_id }, default_options] - # transaction checkin - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] spanner.service.mocked_service = mock nested_error = assert_raises RuntimeError do client.transaction do |tx| + # These mutation will never reach rpc layer, so no mocks for them. tx.update "users", [{ id: 1, name: "Charlie", active: false }] tx.insert "users", [{ id: 2, name: "Harvey", active: true }] tx.upsert "users", [{ id: 3, name: "Marley", active: false }] diff --git a/google-cloud-spanner/test/google/cloud/spanner/client/transaction_test.rb b/google-cloud-spanner/test/google/cloud/spanner/client/transaction_test.rb index 509c75f..a91c5eb 100644 --- a/google-cloud-spanner/test/google/cloud/spanner/client/transaction_test.rb +++ b/google-cloud-spanner/test/google/cloud/spanner/client/transaction_test.rb @@ -23,7 +23,24 @@ let(:transaction_id) { "tx789" } let(:transaction_grpc) { Google::Cloud::Spanner::V1::Transaction.new id: transaction_id } let(:transaction) { Google::Cloud::Spanner::Transaction.from_grpc transaction_grpc, session } - let(:tx_selector) { Google::Cloud::Spanner::V1::TransactionSelector.new id: transaction_id } + let(:tx_selector_with_id) { Google::Cloud::Spanner::V1::TransactionSelector.new id: transaction_id } + let(:tx_selector) do + Google::Cloud::Spanner::V1::TransactionSelector.new( + begin: Google::Cloud::Spanner::V1::TransactionOptions.new( + read_write: Google::Cloud::Spanner::V1::TransactionOptions::ReadWrite.new( + read_lock_mode: :READ_LOCK_MODE_UNSPECIFIED + ) + ) + ) + end + let(:tx_no_dml_options) do + # Google::Cloud::Spanner::V1::TransactionSelector.new id: transaction_id + Google::Cloud::Spanner::V1::TransactionOptions.new( + read_write: Google::Cloud::Spanner::V1::TransactionOptions::ReadWrite.new( + read_lock_mode: :READ_LOCK_MODE_UNSPECIFIED + ) + ) + end let(:default_options) { ::Gapic::CallOptions.new metadata: { "google-cloud-resource-prefix" => database_path(instance_id, database_id) } } let :results_hash do { @@ -41,7 +58,8 @@ { name: "project_ids", type: { code: :ARRAY, array_element_type: { code: :INT64 } } } ] - } + }, + transaction: { id: transaction_id }, }, values: [ { string_value: "1" }, @@ -131,18 +149,11 @@ mock = Minitest::Mock.new spanner.service.mocked_service = mock mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector, seqno: 1, options: default_options mock.expect :commit, commit_resp, [{ session: session_grpc.name, mutations: [], transaction_id: transaction_id, single_use_transaction: nil, request_options: nil }, default_options] - # transaction checkin - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] results = nil timestamp = client.transaction do |tx| @@ -170,17 +181,11 @@ mock = Minitest::Mock.new mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] mock.expect :commit, commit_resp, [{ - session: session_grpc.name, mutations: mutations, transaction_id: transaction_id, - single_use_transaction: nil, request_options: nil - }, default_options] - # transaction checkin - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil + session: session_grpc.name, mutations: mutations, transaction_id: nil, + single_use_transaction: tx_no_dml_options, request_options: nil }, default_options] + spanner.service.mocked_service = mock timestamp = client.transaction do |tx| @@ -205,16 +210,9 @@ mock = Minitest::Mock.new mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] mock.expect :commit, commit_resp, [{ - session: session_grpc.name, mutations: mutations, transaction_id: transaction_id, - single_use_transaction: nil, request_options: nil - }, default_options] - # transaction checkin - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil + session: session_grpc.name, mutations: mutations, transaction_id: nil, + single_use_transaction: tx_no_dml_options, request_options: nil }, default_options] spanner.service.mocked_service = mock @@ -240,16 +238,9 @@ mock = Minitest::Mock.new mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] mock.expect :commit, commit_resp, [{ - session: session_grpc.name, mutations: mutations, transaction_id: transaction_id, - single_use_transaction: nil, request_options: nil - }, default_options] - # transaction checkin - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil + session: session_grpc.name, mutations: mutations, transaction_id: nil, + single_use_transaction: tx_no_dml_options, request_options: nil }, default_options] spanner.service.mocked_service = mock @@ -275,16 +266,9 @@ mock = Minitest::Mock.new mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] mock.expect :commit, commit_resp, [{ - session: session_grpc.name, mutations: mutations, transaction_id: transaction_id, - single_use_transaction: nil, request_options: nil - }, default_options] - # transaction checkin - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil + session: session_grpc.name, mutations: mutations, transaction_id: nil, + single_use_transaction: tx_no_dml_options, request_options: nil }, default_options] spanner.service.mocked_service = mock @@ -310,16 +294,9 @@ mock = Minitest::Mock.new mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] mock.expect :commit, commit_resp, [{ - session: session_grpc.name, mutations: mutations, transaction_id: transaction_id, - single_use_transaction: nil, request_options: nil - }, default_options] - # transaction checkin - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil + session: session_grpc.name, mutations: mutations, transaction_id: nil, + single_use_transaction: tx_no_dml_options, request_options: nil }, default_options] spanner.service.mocked_service = mock @@ -348,16 +325,9 @@ mock = Minitest::Mock.new mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] mock.expect :commit, commit_resp, [{ - session: session_grpc.name, mutations: mutations, transaction_id: transaction_id, - single_use_transaction: nil, request_options: nil - }, default_options] - # transaction checkin - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil + session: session_grpc.name, mutations: mutations, transaction_id: nil, + single_use_transaction: tx_no_dml_options, request_options: nil }, default_options] spanner.service.mocked_service = mock @@ -384,16 +354,9 @@ mock = Minitest::Mock.new mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] mock.expect :commit, commit_resp, [{ - session: session_grpc.name, mutations: mutations, transaction_id: transaction_id, - single_use_transaction: nil, request_options: nil - }, default_options] - # transaction checkin - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil, + session: session_grpc.name, mutations: mutations, transaction_id: nil, + single_use_transaction: tx_no_dml_options, request_options: nil }, default_options] spanner.service.mocked_service = mock @@ -422,16 +385,9 @@ mock = Minitest::Mock.new mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] mock.expect :commit, commit_resp, [{ - session: session_grpc.name, mutations: mutations, transaction_id: transaction_id, - single_use_transaction: nil, request_options: nil - }, default_options] - # transaction checkin - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil + session: session_grpc.name, mutations: mutations, transaction_id: nil, + single_use_transaction: tx_no_dml_options, request_options: nil }, default_options] spanner.service.mocked_service = mock @@ -456,16 +412,9 @@ mock = Minitest::Mock.new mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] mock.expect :commit, commit_resp, [{ - session: session_grpc.name, mutations: mutations, transaction_id: transaction_id, - single_use_transaction: nil, request_options: nil - }, default_options] - # transaction checkin - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil + session: session_grpc.name, mutations: mutations, transaction_id: nil, + single_use_transaction: tx_no_dml_options, request_options: nil }, default_options] spanner.service.mocked_service = mock @@ -518,16 +467,9 @@ mock = Minitest::Mock.new mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] mock.expect :commit, commit_resp, [{ - session: session_grpc.name, mutations: mutations, transaction_id: transaction_id, - single_use_transaction: nil, request_options: nil - }, default_options] - # transaction checkin - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil + session: session_grpc.name, mutations: mutations, transaction_id: nil, + single_use_transaction: tx_no_dml_options, request_options: nil }, default_options] spanner.service.mocked_service = mock @@ -559,18 +501,11 @@ mock = Minitest::Mock.new spanner.service.mocked_service = mock mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector, seqno: 1, options: default_options mock.expect :commit, commit_resp, [{ session: session_grpc.name, mutations: [], transaction_id: transaction_id, single_use_transaction: nil, request_options: nil }, expect_options] - # transaction checkin - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] results = nil timestamp = client.transaction call_options: call_options do |tx| @@ -594,16 +529,9 @@ it "commits multiple mutations" do mock = Minitest::Mock.new mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] mock.expect :commit, commit_resp, [{ - session: session_grpc.name, mutations: mutations, transaction_id: transaction_id, - single_use_transaction: nil, request_options: request_options - }, default_options] - # transaction checkin - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil + session: session_grpc.name, mutations: mutations, transaction_id: nil, + single_use_transaction: tx_no_dml_options, request_options: request_options }, default_options] spanner.service.mocked_service = mock @@ -625,18 +553,11 @@ mock = Minitest::Mock.new spanner.service.mocked_service = mock mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector, seqno: 1, request_options: request_options, options: default_options mock.expect :commit, commit_resp, [{ session: session_grpc.name, mutations: [], transaction_id: transaction_id, single_use_transaction: nil, request_options: nil }, default_options] - # transaction checkin - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] timestamp = client.transaction do |tx| tx.execute_query "SELECT * FROM users", request_options: request_options @@ -662,16 +583,13 @@ mock = Minitest::Mock.new spanner.service.mocked_service = mock mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector, seqno: 1, request_options: { transaction_tag: "Tag-1", request_tag: "Tag-1-1" }, options: default_options expect_execute_streaming_sql update_results_enum, session_grpc.name, - "UPDATE users SET active = true", transaction: tx_selector, + "UPDATE users SET active = true", transaction: tx_selector_with_id, seqno: 2, request_options: { transaction_tag: "Tag-1", request_tag: "Tag-1-2" }, options: default_options @@ -680,10 +598,6 @@ single_use_transaction: nil, request_options: { transaction_tag: "Tag-1" } }, default_options] - # transaction checkin - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] client.transaction request_options: { tag: "Tag-1" } do |tx| _(tx).must_be_kind_of Google::Cloud::Spanner::Transaction @@ -730,4 +644,4 @@ def assert_results results _(row[:avatar].read).must_equal "image" _(row[:project_ids]).must_equal [1, 2, 3] end -end \ No newline at end of file +end diff --git a/google-cloud-spanner/test/google/cloud/spanner/pool/batch_create_sessions_test.rb b/google-cloud-spanner/test/google/cloud/spanner/pool/batch_create_sessions_test.rb new file mode 100644 index 0000000..e918d2e --- /dev/null +++ b/google-cloud-spanner/test/google/cloud/spanner/pool/batch_create_sessions_test.rb @@ -0,0 +1,60 @@ +# Copyright 2017 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require "helper" + +describe Google::Cloud::Spanner::Pool, :batch_create_sessions, :mock_spanner do + let(:instance_id) { "my-instance-id" } + let(:database_id) { "my-database-id" } + let(:client) { spanner.client instance_id, database_id, pool: { min: 0, max: 4 } } + let(:tx_opts) { Google::Cloud::Spanner::V1::TransactionOptions.new(read_write: Google::Cloud::Spanner::V1::TransactionOptions::ReadWrite.new) } + let(:default_options) { ::Gapic::CallOptions.new metadata: { "google-cloud-resource-prefix" => database_path(instance_id, database_id) } } + let(:client_pool) do + session.instance_variable_set :@last_updated_at, Time.now + p = client.instance_variable_get :@pool + p.sessions_available = [session] + p.sessions_in_use = [] + p + end + + after do + shutdown_client! client + end + + it "calls batch_create_sessions until min number of sessions are returned" do + mock = Minitest::Mock.new + spanner.service.mocked_service = mock + sessions_1 = Google::Cloud::Spanner::V1::BatchCreateSessionsResponse.new( + session: [ + Google::Cloud::Spanner::V1::Session.new(name: session_path(instance_id, database_id, "session-001")), + ] + ) + sessions_2 = Google::Cloud::Spanner::V1::BatchCreateSessionsResponse.new( + session: [ + Google::Cloud::Spanner::V1::Session.new(name: session_path(instance_id, database_id, "session-002")), + ] + ) + mock.expect :batch_create_sessions, sessions_1, [{ database: database_path(instance_id, database_id), session_count: 2, session_template: nil }, default_options] + mock.expect :batch_create_sessions, sessions_2, [{ database: database_path(instance_id, database_id), session_count: 1, session_template: nil }, default_options] + + pool = Google::Cloud::Spanner::Pool.new client, min: 2 + + shutdown_pool! pool + + _(pool.sessions_available.size).must_equal 2 + _(pool.sessions_in_use.size).must_equal 0 + + mock.verify + end +end diff --git a/google-cloud-spanner/test/google/cloud/spanner/pool/close_test.rb b/google-cloud-spanner/test/google/cloud/spanner/pool/close_test.rb index 6ccbcc4..1da89a1 100644 --- a/google-cloud-spanner/test/google/cloud/spanner/pool/close_test.rb +++ b/google-cloud-spanner/test/google/cloud/spanner/pool/close_test.rb @@ -25,8 +25,8 @@ let(:pool) do session.instance_variable_set :@last_updated_at, Time.now p = client.instance_variable_get :@pool - p.all_sessions = [session] - p.session_stack = [session] + p.sessions_available = [session] + p.sessions_in_use = [] p end diff --git a/google-cloud-spanner/test/google/cloud/spanner/pool/keepalive_or_release_test.rb b/google-cloud-spanner/test/google/cloud/spanner/pool/keepalive_or_release_test.rb index 1f0a125..8a408b2 100644 --- a/google-cloud-spanner/test/google/cloud/spanner/pool/keepalive_or_release_test.rb +++ b/google-cloud-spanner/test/google/cloud/spanner/pool/keepalive_or_release_test.rb @@ -55,8 +55,8 @@ # update the session so it was last updated an hour ago session.instance_variable_set :@last_updated_at, Time.now - 60*60 # set the session in the pool - pool.all_sessions = [session] - pool.session_stack = [session] + pool.sessions_available = [session] + pool.sessions_in_use = [] mock = Minitest::Mock.new session.service.mocked_service = mock @@ -69,49 +69,12 @@ mock.verify end - it "calls keepalive on the transactions that need it" do - # update the session so it was last updated an hour ago - session.instance_variable_set :@last_updated_at, Time.now - 60*60 - # set the session in the pool - pool.all_sessions = [session] - pool.transaction_stack = [transaction] - - mock = Minitest::Mock.new - mock.expect :begin_transaction, transaction_grpc, [{ - session: session_grpc.name, options: tx_opts, request_options: nil - }, default_options] - session.service.mocked_service = mock - - pool.keepalive_or_release! - - shutdown_pool! pool - - mock.verify - end - it "doesn't call keepalive on sessions that don't need it" do # update the session so it was last updated now session.instance_variable_set :@last_updated_at, Time.now # set the session in the pool - pool.all_sessions = [session] - pool.session_stack = [session] - - mock = Minitest::Mock.new - session.service.mocked_service = mock - - pool.keepalive_or_release! - - shutdown_pool! pool - - mock.verify - end - - it "doesn't call keepalive on transactions that don't need it" do - # update the session so it was last updated now - session.instance_variable_set :@last_updated_at, Time.now - # set the session in the pool - pool.all_sessions = [session] - pool.transaction_stack = [transaction] + pool.sessions_available = [session] + pool.sessions_in_use = [] mock = Minitest::Mock.new session.service.mocked_service = mock diff --git a/google-cloud-spanner/test/google/cloud/spanner/pool/new_sessions_in_process_test.rb b/google-cloud-spanner/test/google/cloud/spanner/pool/new_sessions_in_process_test.rb index e035365..89aef54 100644 --- a/google-cloud-spanner/test/google/cloud/spanner/pool/new_sessions_in_process_test.rb +++ b/google-cloud-spanner/test/google/cloud/spanner/pool/new_sessions_in_process_test.rb @@ -26,9 +26,8 @@ let(:pool) do session.instance_variable_set :@last_updated_at, Time.now p = client.instance_variable_get :@pool - p.all_sessions = [session] - p.session_stack = [session] - p.transaction_stack = [] + p.sessions_available = [session] + p.sessions_in_use = [] p end @@ -43,14 +42,14 @@ def stub.create_session *args end spanner.service.mocked_service = stub - _(pool.all_sessions.size).must_equal 1 - _(pool.session_stack.size).must_equal 1 + _(pool.sessions_available.size).must_equal 1 + _(pool.sessions_in_use.size).must_equal 0 _(pool.instance_variable_get(:@new_sessions_in_process)).must_equal 0 s1 = pool.checkout_session # gets the one session from the stack - _(pool.all_sessions.size).must_equal 1 - _(pool.session_stack.size).must_equal 0 + _(pool.sessions_available.size).must_equal 0 + _(pool.sessions_in_use.size).must_equal 1 _(pool.instance_variable_get(:@new_sessions_in_process)).must_equal 0 raised_error = assert_raises Google::Cloud::Error do @@ -58,8 +57,8 @@ def stub.create_session *args end _(raised_error.message).must_equal "11:sumthin happen" - _(pool.all_sessions.size).must_equal 1 - _(pool.session_stack.size).must_equal 0 + _(pool.sessions_available.size).must_equal 0 + _(pool.sessions_in_use.size).must_equal 1 _(pool.instance_variable_get(:@new_sessions_in_process)).must_equal 0 10.times do @@ -69,16 +68,16 @@ def stub.create_session *args _(raised_error.message).must_equal "11:sumthin happen" end - _(pool.all_sessions.size).must_equal 1 - _(pool.session_stack.size).must_equal 0 + _(pool.sessions_available.size).must_equal 0 + _(pool.sessions_in_use.size).must_equal 1 _(pool.instance_variable_get(:@new_sessions_in_process)).must_equal 0 pool.checkin_session s1 shutdown_pool! pool - _(pool.all_sessions.size).must_equal 1 - _(pool.session_stack.size).must_equal 1 + _(pool.sessions_available.size).must_equal 1 + _(pool.sessions_in_use.size).must_equal 0 _(pool.instance_variable_get(:@new_sessions_in_process)).must_equal 0 end end diff --git a/google-cloud-spanner/test/google/cloud/spanner/pool/write_ratio_test.rb b/google-cloud-spanner/test/google/cloud/spanner/pool/write_ratio_test.rb deleted file mode 100644 index 53e03cc..0000000 --- a/google-cloud-spanner/test/google/cloud/spanner/pool/write_ratio_test.rb +++ /dev/null @@ -1,143 +0,0 @@ -# Copyright 2017 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -require "helper" - -describe Google::Cloud::Spanner::Pool, :write_ratio, :mock_spanner do - let(:instance_id) { "my-instance-id" } - let(:database_id) { "my-database-id" } - let(:client) { spanner.client instance_id, database_id, pool: { min: 0, max: 4 } } - let(:tx_opts) { Google::Cloud::Spanner::V1::TransactionOptions.new(read_write: Google::Cloud::Spanner::V1::TransactionOptions::ReadWrite.new) } - let(:default_options) { ::Gapic::CallOptions.new metadata: { "google-cloud-resource-prefix" => database_path(instance_id, database_id) } } - let(:client_pool) do - session.instance_variable_set :@last_updated_at, Time.now - p = client.instance_variable_get :@pool - p.all_sessions = [session] - p.session_stack = [session] - p - end - - after do - shutdown_client! client - end - - it "creates two sessions and one transaction" do - mock = Minitest::Mock.new - spanner.service.mocked_service = mock - sessions = Google::Cloud::Spanner::V1::BatchCreateSessionsResponse.new( - session: [ - Google::Cloud::Spanner::V1::Session.new(name: session_path(instance_id, database_id, "session-001")), - Google::Cloud::Spanner::V1::Session.new(name: session_path(instance_id, database_id, "session-002")) - ] - ) - mock.expect :batch_create_sessions, sessions, [{ database: database_path(instance_id, database_id), session_count: 2, session_template: nil }, default_options] - expect_begin_transaction Google::Cloud::Spanner::V1::Transaction.new(id: "tx-002-01"), tx_opts, default_options - - pool = Google::Cloud::Spanner::Pool.new client, min: 2, write_ratio: 0.5 - - shutdown_pool! pool - - _(pool.all_sessions.size).must_equal 2 - _(pool.session_stack.size).must_equal 1 - _(pool.transaction_stack.size).must_equal 1 - - mock.verify - end - - it "calls batch_create_sessions until min number of sessions are returned" do - mock = Minitest::Mock.new - spanner.service.mocked_service = mock - sessions = Google::Cloud::Spanner::V1::BatchCreateSessionsResponse.new( - session: [ - Google::Cloud::Spanner::V1::Session.new(name: session_path(instance_id, database_id, "session-001")), - ] - ) - sessions_2 = Google::Cloud::Spanner::V1::BatchCreateSessionsResponse.new( - session: [ - Google::Cloud::Spanner::V1::Session.new(name: session_path(instance_id, database_id, "session-002")), - ] - ) - mock.expect :batch_create_sessions, sessions, [{ database: database_path(instance_id, database_id), session_count: 2, session_template: nil }, default_options] - mock.expect :batch_create_sessions, sessions_2, [{ database: database_path(instance_id, database_id), session_count: 1, session_template: nil }, default_options] - expect_begin_transaction Google::Cloud::Spanner::V1::Transaction.new(id: "tx-002-01"), tx_opts, default_options - - pool = Google::Cloud::Spanner::Pool.new client, min: 2, write_ratio: 0.5 - - shutdown_pool! pool - - _(pool.all_sessions.size).must_equal 2 - _(pool.session_stack.size).must_equal 1 - _(pool.transaction_stack.size).must_equal 1 - - mock.verify - end - - it "creates five sessions and three transactions" do - mock = Minitest::Mock.new - spanner.service.mocked_service = mock - sessions = Google::Cloud::Spanner::V1::BatchCreateSessionsResponse.new( - session: [ - Google::Cloud::Spanner::V1::Session.new(name: session_path(instance_id, database_id, "session-001")), - Google::Cloud::Spanner::V1::Session.new(name: session_path(instance_id, database_id, "session-002")), - Google::Cloud::Spanner::V1::Session.new(name: session_path(instance_id, database_id, "session-003")), - Google::Cloud::Spanner::V1::Session.new(name: session_path(instance_id, database_id, "session-004")), - Google::Cloud::Spanner::V1::Session.new(name: session_path(instance_id, database_id, "session-005")) - ] - ) - mock.expect :batch_create_sessions, sessions, [{ database: database_path(instance_id, database_id), session_count: 5, session_template: nil }, default_options] - expect_begin_transaction Google::Cloud::Spanner::V1::Transaction.new(id: "tx-003-01"), tx_opts, default_options - expect_begin_transaction Google::Cloud::Spanner::V1::Transaction.new(id: "tx-004-01"), tx_opts, default_options - expect_begin_transaction Google::Cloud::Spanner::V1::Transaction.new(id: "tx-005-01"), tx_opts, default_options - - pool = Google::Cloud::Spanner::Pool.new client, min: 5, write_ratio: 0.5 - - shutdown_pool! pool - - _(pool.all_sessions.size).must_equal 5 - _(pool.session_stack.size).must_equal 2 - _(pool.transaction_stack.size).must_equal 3 - - mock.verify - end - - it "creates eight sessions and three transactions" do - mock = Minitest::Mock.new - spanner.service.mocked_service = mock - sessions = Google::Cloud::Spanner::V1::BatchCreateSessionsResponse.new( - session: [ - Google::Cloud::Spanner::V1::Session.new(name: session_path(instance_id, database_id, "session-001")), - Google::Cloud::Spanner::V1::Session.new(name: session_path(instance_id, database_id, "session-002")), - Google::Cloud::Spanner::V1::Session.new(name: session_path(instance_id, database_id, "session-003")), - Google::Cloud::Spanner::V1::Session.new(name: session_path(instance_id, database_id, "session-004")), - Google::Cloud::Spanner::V1::Session.new(name: session_path(instance_id, database_id, "session-005")), - Google::Cloud::Spanner::V1::Session.new(name: session_path(instance_id, database_id, "session-006")), - Google::Cloud::Spanner::V1::Session.new(name: session_path(instance_id, database_id, "session-007")), - Google::Cloud::Spanner::V1::Session.new(name: session_path(instance_id, database_id, "session-008")) - ] - ) - mock.expect :batch_create_sessions, sessions, [{ database: database_path(instance_id, database_id), session_count: 8, session_template: nil }, default_options] - expect_begin_transaction Google::Cloud::Spanner::V1::Transaction.new(id: "tx-007-01"), tx_opts, default_options - expect_begin_transaction Google::Cloud::Spanner::V1::Transaction.new(id: "tx-008-01"), tx_opts, default_options - - pool = Google::Cloud::Spanner::Pool.new client, min: 8, write_ratio: 0.3 - - shutdown_pool! pool - - _(pool.all_sessions.size).must_equal 8 - _(pool.session_stack.size).must_equal 6 - _(pool.transaction_stack.size).must_equal 2 - - mock.verify - end -end diff --git a/google-cloud-spanner/test/google/cloud/spanner/pool_test.rb b/google-cloud-spanner/test/google/cloud/spanner/pool_test.rb index 2b714a0..0db64a4 100644 --- a/google-cloud-spanner/test/google/cloud/spanner/pool_test.rb +++ b/google-cloud-spanner/test/google/cloud/spanner/pool_test.rb @@ -17,18 +17,22 @@ describe Google::Cloud::Spanner::Pool, :mock_spanner do let(:instance_id) { "my-instance-id" } let(:database_id) { "my-database-id" } - let(:session_id) { "session123" } + let(:session_id) { "session1" } + let(:session_id_2) { "session2" } + let(:session_id_3) { "session3" } + let(:session_id_4) { "session4" } let(:session_grpc) { Google::Cloud::Spanner::V1::Session.new name: session_path(instance_id, database_id, session_id) } + let(:session_grpc_2) { Google::Cloud::Spanner::V1::Session.new name: session_path(instance_id, database_id, session_id_2) } + let(:session_grpc_3) { Google::Cloud::Spanner::V1::Session.new name: session_path(instance_id, database_id, session_id_3) } + let(:session_grpc_4) { Google::Cloud::Spanner::V1::Session.new name: session_path(instance_id, database_id, session_id_4) } let(:session) { Google::Cloud::Spanner::Session.from_grpc session_grpc, spanner.service } let(:default_options) { ::Gapic::CallOptions.new metadata: { "google-cloud-resource-prefix" => database_path(instance_id, database_id) } } let(:client) { spanner.client instance_id, database_id, pool: { min: 0, max: 4 } } - let(:tx_opts) { Google::Cloud::Spanner::V1::TransactionOptions.new(read_write: Google::Cloud::Spanner::V1::TransactionOptions::ReadWrite.new) } let(:pool) do session.instance_variable_set :@last_updated_at, Time.now p = client.instance_variable_get :@pool - p.all_sessions = [session] - p.session_stack = [session] - p.transaction_stack = [] + p.sessions_available = [session] + p.sessions_in_use = [] p end @@ -37,56 +41,56 @@ end it "can checkout and checkin a session" do - _(pool.all_sessions.size).must_equal 1 - _(pool.session_stack.size).must_equal 1 + _(pool.sessions_available.size).must_equal 1 + _(pool.sessions_in_use.size).must_equal 0 s = pool.checkout_session - _(pool.all_sessions.size).must_equal 1 - _(pool.session_stack.size).must_equal 0 + _(pool.sessions_available.size).must_equal 0 + _(pool.sessions_in_use.size).must_equal 1 pool.checkin_session s shutdown_pool! pool - _(pool.all_sessions.size).must_equal 1 - _(pool.session_stack.size).must_equal 1 + _(pool.sessions_available.size).must_equal 1 + _(pool.sessions_in_use.size).must_equal 0 end it "creates new sessions when needed" do mock = Minitest::Mock.new - mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] + mock.expect :create_session, session_grpc_2, [{ database: database_path(instance_id, database_id), session: nil }, default_options] spanner.service.mocked_service = mock - _(pool.all_sessions.size).must_equal 1 - _(pool.session_stack.size).must_equal 1 + _(pool.sessions_available.size).must_equal 1 + _(pool.sessions_in_use.size).must_equal 0 s1 = pool.checkout_session s2 = pool.checkout_session - _(pool.all_sessions.size).must_equal 2 - _(pool.session_stack.size).must_equal 0 + _(pool.sessions_available.size).must_equal 0 + _(pool.sessions_in_use.size).must_equal 2 pool.checkin_session s1 pool.checkin_session s2 shutdown_pool! pool - _(pool.all_sessions.size).must_equal 2 - _(pool.session_stack.size).must_equal 2 + _(pool.sessions_available.size).must_equal 2 + _(pool.sessions_in_use.size).must_equal 0 mock.verify end it "raises when checking out more than MAX sessions" do mock = Minitest::Mock.new - mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] + mock.expect :create_session, session_grpc_2, [{ database: database_path(instance_id, database_id), session: nil }, default_options] + mock.expect :create_session, session_grpc_3, [{ database: database_path(instance_id, database_id), session: nil }, default_options] + mock.expect :create_session, session_grpc_4, [{ database: database_path(instance_id, database_id), session: nil }, default_options] spanner.service.mocked_service = mock - _(pool.all_sessions.size).must_equal 1 - _(pool.session_stack.size).must_equal 1 + _(pool.sessions_available.size).must_equal 1 + _(pool.sessions_in_use.size).must_equal 0 s1 = pool.checkout_session s2 = pool.checkout_session @@ -97,8 +101,8 @@ pool.checkout_session end - _(pool.all_sessions.size).must_equal 4 - _(pool.session_stack.size).must_equal 0 + _(pool.sessions_available.size).must_equal 0 + _(pool.sessions_in_use.size).must_equal 4 pool.checkin_session s1 pool.checkin_session s2 @@ -107,8 +111,8 @@ shutdown_pool! pool - _(pool.all_sessions.size).must_equal 4 - _(pool.session_stack.size).must_equal 4 + _(pool.sessions_available.size).must_equal 4 + _(pool.sessions_in_use.size).must_equal 0 mock.verify end @@ -121,230 +125,4 @@ end _(checkin_error.message).must_equal "Cannot checkin session" end - - it "uses existing transaction when checking out and checking in a transaction" do - init_tx = Google::Cloud::Spanner::Transaction.from_grpc Google::Cloud::Spanner::V1::Transaction.new(id: "tx-001-01"), pool.session_stack.shift - pool.transaction_stack << init_tx - - mock = Minitest::Mock.new - # created when checking in - mock.expect :begin_transaction, Google::Cloud::Spanner::V1::Transaction.new(id: "tx-001-02"), [{ - session: session_path(instance_id, database_id, session_id), options: tx_opts, - request_options: nil - }, default_options] - # reload on session pool checkin - mock.expect :get_session, session_grpc, [{ name: session_grpc.name }, default_options] - mock.expect :begin_transaction, Google::Cloud::Spanner::V1::Transaction.new(id: "tx-001-02"), [{session: session_path(instance_id, database_id, session_id), options: tx_opts }, default_options] - spanner.service.mocked_service = mock - - _(pool.all_sessions.size).must_equal 1 - _(pool.session_stack.size).must_equal 0 - _(pool.transaction_stack.size).must_equal 1 - _(pool.transaction_stack.first).must_equal init_tx - - tx = pool.checkout_transaction - _(tx).must_equal init_tx - - _(pool.all_sessions.size).must_equal 1 - _(pool.session_stack.size).must_equal 0 - _(pool.transaction_stack.size).must_equal 0 - - pool.checkin_transaction tx - - shutdown_pool! pool - - _(pool.all_sessions.size).must_equal 1 - _(pool.session_stack.size).must_equal 0 - _(pool.transaction_stack.size).must_equal 1 - end - - it "can create a transaction when checking out and checking in a transaction" do - mock = Minitest::Mock.new - # created when checking out - mock.expect :begin_transaction, Google::Cloud::Spanner::V1::Transaction.new(id: "tx-001-01"), [{ - session: session_path(instance_id, database_id, session_id), options: tx_opts, - request_options: nil - }, default_options] - # created when checking in - mock.expect :begin_transaction, Google::Cloud::Spanner::V1::Transaction.new(id: "tx-001-02"), [{ session: session_path(instance_id, database_id, session_id), options: tx_opts }, default_options] - spanner.service.mocked_service = mock - - _(pool.all_sessions.size).must_equal 1 - _(pool.session_stack.size).must_equal 1 - _(pool.transaction_stack.size).must_equal 0 - - tx = pool.checkout_transaction - - _(pool.all_sessions.size).must_equal 1 - _(pool.session_stack.size).must_equal 0 - _(pool.transaction_stack.size).must_equal 0 - - pool.checkin_transaction tx - - shutdown_pool! pool - - _(pool.all_sessions.size).must_equal 1 - _(pool.session_stack.size).must_equal 0 - _(pool.transaction_stack.size).must_equal 1 - end - - it "creates new transaction when needed" do - mock = Minitest::Mock.new - mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - # created when checking out - mock.expect :begin_transaction, Google::Cloud::Spanner::V1::Transaction.new(id: "tx-001-01"), [{ - session: session_path(instance_id, database_id, session_id), options: tx_opts, - request_options: nil - }, default_options] - mock.expect :begin_transaction, Google::Cloud::Spanner::V1::Transaction.new(id: "tx-002-01"), [{ - session: session_path(instance_id, database_id, session_id), options: tx_opts, - request_options: nil - }, default_options] - spanner.service.mocked_service = mock - - _(pool.all_sessions.size).must_equal 1 - _(pool.session_stack.size).must_equal 1 - _(pool.transaction_stack.size).must_equal 0 - - tx1 = pool.checkout_transaction - tx2 = pool.checkout_transaction - - _(pool.all_sessions.size).must_equal 2 - _(pool.session_stack.size).must_equal 0 - _(pool.transaction_stack.size).must_equal 0 - - pool.checkin_transaction tx1 - pool.checkin_transaction tx2 - - shutdown_pool! pool - - _(pool.all_sessions.size).must_equal 2 - _(pool.session_stack.size).must_equal 0 - _(pool.transaction_stack.size).must_equal 2 - - mock.verify - end - - it "creates new transaction when needed using with_transaction" do - mock = Minitest::Mock.new - mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - # created when checking out - mock.expect :begin_transaction, Google::Cloud::Spanner::V1::Transaction.new(id: "tx-001-01"), [{ - session: session_path(instance_id, database_id, session_id), options: tx_opts, - request_options: nil - }, default_options] - mock.expect :begin_transaction, Google::Cloud::Spanner::V1::Transaction.new(id: "tx-002-01"), [{ - session: session_path(instance_id, database_id, session_id), options: tx_opts, - request_options: nil - }, default_options] - # created when checking in - mock.expect :begin_transaction, Google::Cloud::Spanner::V1::Transaction.new(id: "tx-001-02"), [{ - session: session_path(instance_id, database_id, session_id), options: tx_opts, - request_options: nil - }, default_options] - mock.expect :begin_transaction, Google::Cloud::Spanner::V1::Transaction.new(id: "tx-002-02"), [{ - session: session_path(instance_id, database_id, session_id), options: tx_opts, - request_options: nil - }, default_options] - spanner.service.mocked_service = mock - - _(pool.all_sessions.size).must_equal 1 - _(pool.session_stack.size).must_equal 1 - _(pool.transaction_stack.size).must_equal 0 - - pool.with_transaction do |tx1| - pool.with_transaction do |tx1| - _(pool.all_sessions.size).must_equal 2 - _(pool.session_stack.size).must_equal 0 - _(pool.transaction_stack.size).must_equal 0 - end - end - - shutdown_pool! pool - - _(pool.all_sessions.size).must_equal 2 - _(pool.session_stack.size).must_equal 0 - _(pool.transaction_stack.size).must_equal 2 - - mock.verify - end - - it "raises when checking out more than MAX transaction" do - mock = Minitest::Mock.new - mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - mock.expect :create_session, session_grpc, [{ database: database_path(instance_id, database_id), session: nil }, default_options] - # created when checking out - mock.expect :begin_transaction, Google::Cloud::Spanner::V1::Transaction.new(id: "tx-001-01"), [{ - session: session_path(instance_id, database_id, session_id), options: tx_opts, - request_options: nil - }, default_options] - mock.expect :begin_transaction, Google::Cloud::Spanner::V1::Transaction.new(id: "tx-002-01"), [{ - session: session_path(instance_id, database_id, session_id), options: tx_opts, - request_options: nil - }, default_options] - mock.expect :begin_transaction, Google::Cloud::Spanner::V1::Transaction.new(id: "tx-003-01"), [{ - session: session_path(instance_id, database_id, session_id), options: tx_opts, - request_options: nil - }, default_options] - mock.expect :begin_transaction, Google::Cloud::Spanner::V1::Transaction.new(id: "tx-004-01"), [{ - session: session_path(instance_id, database_id, session_id), options: tx_opts, - request_options: nil - }, default_options] - spanner.service.mocked_service = mock - - _(pool.all_sessions.size).must_equal 1 - _(pool.session_stack.size).must_equal 1 - _(pool.transaction_stack.size).must_equal 0 - - tx1 = pool.checkout_transaction - tx2 = pool.checkout_transaction - tx3 = pool.checkout_transaction - tx4 = pool.checkout_transaction - - assert_raises Google::Cloud::Spanner::SessionLimitError do - pool.checkout_transaction - end - - _(pool.all_sessions.size).must_equal 4 - _(pool.session_stack.size).must_equal 0 - _(pool.transaction_stack.size).must_equal 0 - - pool.checkin_transaction tx1 - pool.checkin_transaction tx2 - pool.checkin_transaction tx3 - pool.checkin_transaction tx4 - - _(pool.all_sessions.size).must_equal 4 - _(pool.session_stack.size).must_equal 0 - _(pool.transaction_stack.size).must_equal 4 - - s1 = pool.checkout_session - s2 = pool.checkout_session - - _(pool.all_sessions.size).must_equal 4 - _(pool.session_stack.size).must_equal 0 - _(pool.transaction_stack.size).must_equal 2 - - pool.checkin_session s1 - pool.checkin_session s2 - - _(pool.all_sessions.size).must_equal 4 - _(pool.session_stack.size).must_equal 2 - _(pool.transaction_stack.size).must_equal 2 - - shutdown_pool! pool - - mock.verify - end - - it "raises when checking in a transaction that does not belong" do - outside_session = Google::Cloud::Spanner::Session.from_grpc session_grpc, spanner.service - outside_tx = Google::Cloud::Spanner::Transaction.from_grpc Google::Cloud::Spanner::V1::Transaction.new(id: "outside-tx-001"), outside_session - - checkin_error = assert_raises ArgumentError do - pool.checkin_transaction outside_tx - end - _(checkin_error.message).must_equal "Cannot checkin session" - end end diff --git a/google-cloud-spanner/test/google/cloud/spanner/transaction/batch_update_test.rb b/google-cloud-spanner/test/google/cloud/spanner/transaction/batch_update_test.rb index 6475834..7b812c8 100644 --- a/google-cloud-spanner/test/google/cloud/spanner/transaction/batch_update_test.rb +++ b/google-cloud-spanner/test/google/cloud/spanner/transaction/batch_update_test.rb @@ -22,8 +22,16 @@ let(:session) { Google::Cloud::Spanner::Session.from_grpc session_grpc, spanner.service } let(:transaction_id) { "tx789" } let(:transaction_grpc) { Google::Cloud::Spanner::V1::Transaction.new id: transaction_id } - let(:transaction) { Google::Cloud::Spanner::Transaction.from_grpc transaction_grpc, session } + # let(:transaction) { Google::Cloud::Spanner::Transaction.from_grpc transaction_grpc, session } + let(:transaction) { Google::Cloud::Spanner::Transaction.from_grpc nil, session } let(:tx_selector) { Google::Cloud::Spanner::V1::TransactionSelector.new id: transaction_id } + let(:tx_selector_begin) do + Google::Cloud::Spanner::V1::TransactionSelector.new( + begin: Google::Cloud::Spanner::V1::TransactionOptions.new( + read_write: Google::Cloud::Spanner::V1::TransactionOptions::ReadWrite.new + ) + ) + end let(:default_options) { ::Gapic::CallOptions.new metadata: { "google-cloud-resource-prefix" => database_path(instance_id, database_id) } } let(:timestamp) { Time.parse "2017-01-01 20:04:05.06 -0700" } let(:date) { Date.parse "2017-01-02" } @@ -32,7 +40,7 @@ it "can execute a single DML query" do mock = Minitest::Mock.new mock.expect :execute_batch_dml, batch_response_grpc, [{ - session: session_grpc.name, transaction: tx_selector, + session: session_grpc.name, transaction: tx_selector_begin, statements: [statement_grpc("UPDATE users SET active = true")], seqno: 1, request_options: nil }, default_options] @@ -61,7 +69,7 @@ statements << statement_grpc("UPDATE users SET project_ids = @list", params: Google::Protobuf::Struct.new(fields: { "list" => Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [Google::Protobuf::Value.new(string_value: "1"), Google::Protobuf::Value.new(string_value: "2"), Google::Protobuf::Value.new(string_value: "3")])) }), param_types: { "list" => Google::Cloud::Spanner::V1::Type.new(code: :ARRAY, array_element_type: Google::Cloud::Spanner::V1::Type.new(code: :INT64)) }) statements << statement_grpc("UPDATE users SET settings = @dict", params: Google::Protobuf::Struct.new(fields: { "dict" => Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [Google::Protobuf::Value.new(string_value: "production")])) }), param_types: { "dict" => Google::Cloud::Spanner::V1::Type.new(code: :STRUCT, struct_type: Google::Cloud::Spanner::V1::StructType.new(fields: [Google::Cloud::Spanner::V1::StructType::Field.new(name: "env", type: Google::Cloud::Spanner::V1::Type.new(code: :STRING))])) }) mock.expect :execute_batch_dml, batch_response_grpc(9), [{ - session: session_grpc.name, transaction: tx_selector, statements: statements, + session: session_grpc.name, transaction: tx_selector_begin, statements: statements, seqno: 1, request_options: nil }, default_options] session.service.mocked_service = mock @@ -96,6 +104,9 @@ def results_grpc Google::Cloud::Spanner::V1::PartialResultSet.new( metadata: Google::Cloud::Spanner::V1::ResultSetMetadata.new( + transaction: Google::Cloud::Spanner::V1::Transaction.new( + id: transaction_id + ), row_type: Google::Cloud::Spanner::V1::StructType.new( fields: [] ) @@ -114,7 +125,7 @@ def results_enum it "increases seqno for each request" do mock = Minitest::Mock.new session.service.mocked_service = mock - expect_execute_streaming_sql results_enum, session_grpc.name, "UPDATE users SET active = true", transaction: tx_selector, seqno: 1, options: default_options + expect_execute_streaming_sql results_enum, session_grpc.name, "UPDATE users SET active = true", transaction: tx_selector_begin, seqno: 1, options: default_options statement = statement_grpc("UPDATE users SET age = @age", params: Google::Protobuf::Struct.new(fields: { "age" => Google::Protobuf::Value.new(string_value: "29") }), param_types: { "age" => Google::Cloud::Spanner::V1::Type.new(code: :INT64) }) mock.expect :execute_batch_dml, batch_response_grpc, [{ session: session_grpc.name, transaction: tx_selector, statements: [statement], seqno: 2, request_options: nil }, default_options] expect_execute_streaming_sql results_enum, session_grpc.name, "UPDATE users SET active = false", transaction: tx_selector, seqno: 3, options: default_options @@ -141,7 +152,7 @@ def results_enum call_options = { timeout: timeout, retry_policy: retry_policy } mock = Minitest::Mock.new - mock.expect :execute_batch_dml, batch_response_grpc, [{ session: session_grpc.name, transaction: tx_selector, statements: [statement_grpc("UPDATE users SET active = true")], seqno: 1, request_options: nil }, expect_options] + mock.expect :execute_batch_dml, batch_response_grpc, [{ session: session_grpc.name, transaction: tx_selector_begin, statements: [statement_grpc("UPDATE users SET active = true")], seqno: 1, request_options: nil }, expect_options] session.service.mocked_service = mock row_counts = transaction.batch_update call_options: call_options do |b| @@ -165,7 +176,7 @@ def results_enum ) ] mock.expect :execute_batch_dml, batch_response_grpc(1), [{ - session: session_grpc.name, transaction: tx_selector, statements: statements, + session: session_grpc.name, transaction: tx_selector_begin, statements: statements, seqno: 1, request_options: { priority: :PRIORITY_MEDIUM} }, default_options] session.service.mocked_service = mock @@ -181,12 +192,12 @@ def results_enum end it "can execute a barch DML with transaction and request tag" do - transaction = Google::Cloud::Spanner::Transaction.from_grpc transaction_grpc, session + transaction = Google::Cloud::Spanner::Transaction.from_grpc nil, session transaction.transaction_tag = "Tag-1" mock = Minitest::Mock.new mock.expect :execute_batch_dml, batch_response_grpc, [{ - session: session_grpc.name, transaction: tx_selector, + session: session_grpc.name, transaction: tx_selector_begin, statements: [statement_grpc("UPDATE users SET active = true")], seqno: 1, request_options: { transaction_tag: "Tag-1", request_tag: "Tag-1-1" } }, default_options] @@ -208,9 +219,22 @@ def statement_grpc sql, params: nil, param_types: {} sql: sql, params: params, param_types: param_types end + def batch_result_sets_metadata_grpc begin_transaction + if begin_transaction + Google::Cloud::Spanner::V1::ResultSetMetadata.new( + transaction: Google::Cloud::Spanner::V1::Transaction.new( + id: transaction_id + ) + ) + else + nil + end + end + def batch_result_sets_grpc count, row_count_exact: 1 - count.times.map do + count.times.map.with_index do |_, index| Google::Cloud::Spanner::V1::ResultSet.new( + metadata: batch_result_sets_metadata_grpc(index == 0), # include transaction in first result set stats: Google::Cloud::Spanner::V1::ResultSetStats.new( row_count_exact: row_count_exact ) diff --git a/google-cloud-spanner/test/google/cloud/spanner/transaction/concurrent_queries_test.rb b/google-cloud-spanner/test/google/cloud/spanner/transaction/concurrent_queries_test.rb new file mode 100644 index 0000000..55256d4 --- /dev/null +++ b/google-cloud-spanner/test/google/cloud/spanner/transaction/concurrent_queries_test.rb @@ -0,0 +1,290 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require "helper" + +describe Google::Cloud::Spanner::Transaction, :mock_spanner do + let(:instance_id) { "my-instance-id" } + let(:database_id) { "my-database-id" } + let(:session_id) { "session123" } + let(:session_grpc) { Google::Cloud::Spanner::V1::Session.new name: session_path(instance_id, database_id, session_id) } + let(:session) { Google::Cloud::Spanner::Session.from_grpc session_grpc, spanner.service } + let(:transaction_id) { "tx789" } + let(:transaction_grpc) { Google::Cloud::Spanner::V1::Transaction.new id: transaction_id } + # let(:transaction) { Google::Cloud::Spanner::Transaction.from_grpc transaction_grpc, session } + let(:transaction) { Google::Cloud::Spanner::Transaction.from_grpc nil, session } + let(:tx_selector) { Google::Cloud::Spanner::V1::TransactionSelector.new id: transaction_id } + let(:tx_selector_begin) do + Google::Cloud::Spanner::V1::TransactionSelector.new( + begin: Google::Cloud::Spanner::V1::TransactionOptions.new( + read_write: Google::Cloud::Spanner::V1::TransactionOptions::ReadWrite.new + ) + ) + end + let(:default_options) { ::Gapic::CallOptions.new metadata: { "google-cloud-resource-prefix" => database_path(instance_id, database_id) } } + let(:tx_opts) { Google::Cloud::Spanner::V1::TransactionOptions.new(read_write: Google::Cloud::Spanner::V1::TransactionOptions::ReadWrite.new) } + let(:begin_transaction_params) do + { + session: session.path, + options: tx_opts, + request_options: nil + } + end + let :results_hash do + { + metadata: { + row_type: { + fields: [ + { name: "id", type: { code: :INT64 } }, + { name: "name", type: { code: :STRING } }, + { name: "active", type: { code: :BOOL } }, + { name: "age", type: { code: :INT64 } }, + { name: "score", type: { code: :FLOAT64 } }, + { name: "updated_at", type: { code: :TIMESTAMP } }, + { name: "birthday", type: { code: :DATE} }, + { name: "avatar", type: { code: :BYTES } }, + { name: "project_ids", type: { code: :ARRAY, + array_element_type: { code: :INT64 } } } + ] + }, + transaction: { id: transaction_id }, + }, + values: [ + { string_value: "1" }, + { string_value: "Charlie" }, + { bool_value: true}, + { string_value: "29" }, + { number_value: 0.9 }, + { string_value: "2017-01-02T03:04:05.060000000Z" }, + { string_value: "1950-01-01" }, + { string_value: "aW1hZ2U=" }, + { list_value: { values: [ { string_value: "1"}, + { string_value: "2"}, + { string_value: "3"} ]}} + ] + } + end + let(:results_grpc) { Google::Cloud::Spanner::V1::PartialResultSet.new results_hash } + let(:results_enum) { Array(results_grpc).to_enum } + + let(:results_enum_tx_1) do + rh = results_hash + rh[:metadata][:transaction][:id] = "tx123" + Array( Google::Cloud::Spanner::V1::PartialResultSet.new rh ).to_enum + end + let(:results_enum_tx_2) do + rh = results_hash + rh[:metadata][:transaction][:id] = "tx456" + Array( Google::Cloud::Spanner::V1::PartialResultSet.new rh ).to_enum + end + + describe "transaction_id()" do + it "waits on other operations when transaction_id() initiates creation" do + mock = Minitest::Mock.new + session.service.mocked_service = mock + + mock.expect :begin_transaction, transaction_grpc do |received_params| + sleep 2 # simulate delayed response of rpc + received_params == begin_transaction_params + end + + mock.expect :execute_streaming_sql, results_enum do |received_params| + received_params[:transaction] == tx_selector + end + + begin + t1 = Thread.new do + tx_id = transaction.transaction_id + end + sleep 1 # Ensure t1 initiates begin_transaction before t2 initiates inline_begin + t2 = Thread.new do + results_2 = transaction.execute_query "SELECT * FROM users" + end + ensure + t1.join + t2.join + end + + mock.verify + end + + it "does not initiate begin_transaction when inline begin is executed first" do + mock = Minitest::Mock.new + session.service.mocked_service = mock + + mock.expect :execute_streaming_sql, results_enum do |received_params| + sleep 2 # simulate delayed response of rpc + received_params[:transaction] == tx_selector_begin + end + + begin + t1 = Thread.new do + transaction.execute_query "SELECT * FROM users" + end + sleep 1 # Ensure t1 initiates "inline begin" before t2 initiates begin_transaction + t2 = Thread.new do + transaction.transaction_id + end + ensure + t1.join + t2.join + end + + mock.verify + end + end + + describe "execute_query()" do + it "tests concurrent queries in a transaction" do + mock = Minitest::Mock.new + session.service.mocked_service = mock + + mock.expect :execute_streaming_sql, results_enum do |values| + sleep 2 # simulate delayed response of rpc + values[:transaction] == tx_selector_begin + end + + mock.expect :execute_streaming_sql, results_enum do |values| + values[:transaction] == tx_selector + end + + results_1 = nil + results_2 = nil + begin + t1 = Thread.new do + results_1 = transaction.execute_query "SELECT * FROM users" + end + sleep 1 # Ensure t1 initiates "begin" selector instead of t2 + t2 = Thread.new do + results_2 = transaction.execute_query "SELECT * FROM users" + end + ensure + t1.join + t2.join + end + + mock.verify + end + + it "throws exception for first operation, so second operation initiates inline" do + + mock = Minitest::Mock.new + session.service.mocked_service = mock + + mock.expect :execute_streaming_sql, results_enum do |values| + sleep 2 # simulate delayed response of rpc + values[:transaction] == tx_selector_begin + raise Google::Cloud::InvalidArgumentError + end + + mock.expect :execute_streaming_sql, results_enum do |values| + values[:transaction] == tx_selector_begin + end + + results_1 = nil + results_2 = nil + begin + t1 = Thread.new do + assert_raises Google::Cloud::InvalidArgumentError do + results_1 = transaction.execute_query "SELECT * FROM users" + end + end + sleep 1 # Ensure t1 initiates "begin" selector before t2 + t2 = Thread.new do + results_2 = transaction.execute_query "SELECT * FROM users" + end + ensure + t1.join + t2.join + end + + mock.verify + end + end + + describe "read()" do + it "tests concurrent read queries in a transaction" do + columns = [:id, :name, :active, :age, :score, :updated_at, :birthday, :avatar, :project_ids] + + mock = Minitest::Mock.new + session.service.mocked_service = mock + + mock.expect :streaming_read, results_enum do |values| + sleep 2 # simulate delayed response of rpc + values[:transaction] == tx_selector_begin + end + + mock.expect :streaming_read, results_enum do |values| + values[:transaction] == tx_selector + end + + results_1 = nil + results_2 = nil + begin + t1 = Thread.new do + results_1 = transaction.read "my-table", columns, + request_options: { tag: "Tag-1-1" } + end + sleep 1 # Ensure t1 initiates "begin" selector instead of t2 + t2 = Thread.new do + results_2 = transaction.read "my-table", columns, + request_options: { tag: "Tag-1-1" } + end + ensure + t1.join + t2.join + end + + mock.verify + end + + it "throws exception for first operation, so second operation initiates inline" do + columns = [:id, :name, :active, :age, :score, :updated_at, :birthday, :avatar, :project_ids] + + mock = Minitest::Mock.new + session.service.mocked_service = mock + + mock.expect :streaming_read, results_enum do |values| + sleep 2 # simulate delayed response of rpc + values[:transaction] == tx_selector_begin + raise Google::Cloud::InvalidArgumentError + end + + mock.expect :streaming_read, results_enum do |values| + values[:transaction] == tx_selector_begin + end + + results_1 = nil + results_2 = nil + begin + t1 = Thread.new do + assert_raises Google::Cloud::InvalidArgumentError do + results_1 = transaction.read "my-table", columns, + request_options: { tag: "Tag-1-1" } + end + end + sleep 1 # Ensure t1 initiates "begin" selector before t2 + t2 = Thread.new do + results_2 = transaction.read "my-table", columns, + request_options: { tag: "Tag-1-1" } + end + ensure + t1.join + t2.join + end + + mock.verify + end + end +end diff --git a/google-cloud-spanner/test/google/cloud/spanner/transaction/execute_query_test.rb b/google-cloud-spanner/test/google/cloud/spanner/transaction/execute_query_test.rb index b558d2e..7a0a551 100644 --- a/google-cloud-spanner/test/google/cloud/spanner/transaction/execute_query_test.rb +++ b/google-cloud-spanner/test/google/cloud/spanner/transaction/execute_query_test.rb @@ -22,8 +22,16 @@ let(:session) { Google::Cloud::Spanner::Session.from_grpc session_grpc, spanner.service } let(:transaction_id) { "tx789" } let(:transaction_grpc) { Google::Cloud::Spanner::V1::Transaction.new id: transaction_id } - let(:transaction) { Google::Cloud::Spanner::Transaction.from_grpc transaction_grpc, session } + # let(:transaction) { Google::Cloud::Spanner::Transaction.from_grpc transaction_grpc, session } + let(:transaction) { Google::Cloud::Spanner::Transaction.from_grpc nil, session } let(:tx_selector) { Google::Cloud::Spanner::V1::TransactionSelector.new id: transaction_id } + let(:tx_selector_begin) do + Google::Cloud::Spanner::V1::TransactionSelector.new( + begin: Google::Cloud::Spanner::V1::TransactionOptions.new( + read_write: Google::Cloud::Spanner::V1::TransactionOptions::ReadWrite.new + ) + ) + end let(:default_options) { ::Gapic::CallOptions.new metadata: { "google-cloud-resource-prefix" => database_path(instance_id, database_id) } } let :results_hash do { @@ -41,7 +49,8 @@ { name: "project_ids", type: { code: :ARRAY, array_element_type: { code: :INT64 } } } ] - } + }, + transaction: { id: transaction_id }, }, values: [ { string_value: "1" }, @@ -64,9 +73,11 @@ it "can execute a simple query" do mock = Minitest::Mock.new session.service.mocked_service = mock - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector, seqno: 1, options: default_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector_begin, seqno: 1, options: default_options + _(transaction.existing_transaction?).must_equal false results = transaction.execute_query "SELECT * FROM users" + _(transaction.existing_transaction?).must_equal true mock.verify @@ -76,7 +87,7 @@ it "can execute a query with bool param" do mock = Minitest::Mock.new session.service.mocked_service = mock - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE active = @active", transaction: tx_selector, params: Google::Protobuf::Struct.new(fields: { "active" => Google::Protobuf::Value.new(bool_value: true) }), param_types: { "active" => Google::Cloud::Spanner::V1::Type.new(code: :BOOL) }, seqno: 1, options: default_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE active = @active", transaction: tx_selector_begin, params: Google::Protobuf::Struct.new(fields: { "active" => Google::Protobuf::Value.new(bool_value: true) }), param_types: { "active" => Google::Cloud::Spanner::V1::Type.new(code: :BOOL) }, seqno: 1, options: default_options results = transaction.execute_query "SELECT * FROM users WHERE active = @active", params: { active: true } @@ -88,7 +99,7 @@ it "can execute a query with int param" do mock = Minitest::Mock.new session.service.mocked_service = mock - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE age = @age", transaction: tx_selector, params: Google::Protobuf::Struct.new(fields: { "age" => Google::Protobuf::Value.new(string_value: "29") }), param_types: { "age" => Google::Cloud::Spanner::V1::Type.new(code: :INT64) }, seqno: 1, options: default_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE age = @age", transaction: tx_selector_begin, params: Google::Protobuf::Struct.new(fields: { "age" => Google::Protobuf::Value.new(string_value: "29") }), param_types: { "age" => Google::Cloud::Spanner::V1::Type.new(code: :INT64) }, seqno: 1, options: default_options results = transaction.execute_query "SELECT * FROM users WHERE age = @age", params: { age: 29 } @@ -100,7 +111,7 @@ it "can execute a query with float param" do mock = Minitest::Mock.new session.service.mocked_service = mock - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE score = @score", transaction: tx_selector, params: Google::Protobuf::Struct.new(fields: { "score" => Google::Protobuf::Value.new(number_value: 0.9) }), param_types: { "score" => Google::Cloud::Spanner::V1::Type.new(code: :FLOAT64) }, seqno: 1, options: default_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE score = @score", transaction: tx_selector_begin, params: Google::Protobuf::Struct.new(fields: { "score" => Google::Protobuf::Value.new(number_value: 0.9) }), param_types: { "score" => Google::Cloud::Spanner::V1::Type.new(code: :FLOAT64) }, seqno: 1, options: default_options results = transaction.execute_query "SELECT * FROM users WHERE score = @score", params: { score: 0.9 } @@ -114,7 +125,7 @@ mock = Minitest::Mock.new session.service.mocked_service = mock - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE updated_at = @updated_at", transaction: tx_selector, params: Google::Protobuf::Struct.new(fields: { "updated_at" => Google::Protobuf::Value.new(string_value: "2017-01-02T03:04:05.060000000Z") }), param_types: { "updated_at" => Google::Cloud::Spanner::V1::Type.new(code: :TIMESTAMP) }, seqno: 1, options: default_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE updated_at = @updated_at", transaction: tx_selector_begin, params: Google::Protobuf::Struct.new(fields: { "updated_at" => Google::Protobuf::Value.new(string_value: "2017-01-02T03:04:05.060000000Z") }), param_types: { "updated_at" => Google::Cloud::Spanner::V1::Type.new(code: :TIMESTAMP) }, seqno: 1, options: default_options results = transaction.execute_query "SELECT * FROM users WHERE updated_at = @updated_at", params: { updated_at: timestamp } @@ -128,7 +139,7 @@ mock = Minitest::Mock.new session.service.mocked_service = mock - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE birthday = @birthday", transaction: tx_selector, params: Google::Protobuf::Struct.new(fields: { "birthday" => Google::Protobuf::Value.new(string_value: "2017-01-02") }), param_types: { "birthday" => Google::Cloud::Spanner::V1::Type.new(code: :DATE) }, seqno: 1, options: default_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE birthday = @birthday", transaction: tx_selector_begin, params: Google::Protobuf::Struct.new(fields: { "birthday" => Google::Protobuf::Value.new(string_value: "2017-01-02") }), param_types: { "birthday" => Google::Cloud::Spanner::V1::Type.new(code: :DATE) }, seqno: 1, options: default_options results = transaction.execute_query "SELECT * FROM users WHERE birthday = @birthday", params: { birthday: date } @@ -140,7 +151,7 @@ it "can execute a query with String param" do mock = Minitest::Mock.new session.service.mocked_service = mock - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE name = @name", transaction: tx_selector, params: Google::Protobuf::Struct.new(fields: { "name" => Google::Protobuf::Value.new(string_value: "Charlie") }), param_types: { "name" => Google::Cloud::Spanner::V1::Type.new(code: :STRING) }, seqno: 1, options: default_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE name = @name", transaction: tx_selector_begin, params: Google::Protobuf::Struct.new(fields: { "name" => Google::Protobuf::Value.new(string_value: "Charlie") }), param_types: { "name" => Google::Cloud::Spanner::V1::Type.new(code: :STRING) }, seqno: 1, options: default_options results = transaction.execute_query "SELECT * FROM users WHERE name = @name", params: { name: "Charlie" } @@ -154,7 +165,7 @@ mock = Minitest::Mock.new session.service.mocked_service = mock - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE avatar = @avatar", transaction: tx_selector, params: Google::Protobuf::Struct.new(fields: { "avatar" => Google::Protobuf::Value.new(string_value: Base64.strict_encode64("contents")) }), param_types: { "avatar" => Google::Cloud::Spanner::V1::Type.new(code: :BYTES) }, seqno: 1, options: default_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE avatar = @avatar", transaction: tx_selector_begin, params: Google::Protobuf::Struct.new(fields: { "avatar" => Google::Protobuf::Value.new(string_value: Base64.strict_encode64("contents")) }), param_types: { "avatar" => Google::Cloud::Spanner::V1::Type.new(code: :BYTES) }, seqno: 1, options: default_options results = transaction.execute_query "SELECT * FROM users WHERE avatar = @avatar", params: { avatar: file } @@ -166,7 +177,7 @@ it "can execute a query with an Array param" do mock = Minitest::Mock.new session.service.mocked_service = mock - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE project_ids = @list", transaction: tx_selector, params: Google::Protobuf::Struct.new(fields: { "list" => Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [Google::Protobuf::Value.new(string_value: "1"), Google::Protobuf::Value.new(string_value: "2"), Google::Protobuf::Value.new(string_value: "3")])) }), param_types: { "list" => Google::Cloud::Spanner::V1::Type.new(code: :ARRAY, array_element_type: Google::Cloud::Spanner::V1::Type.new(code: :INT64)) }, seqno: 1, options: default_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE project_ids = @list", transaction: tx_selector_begin, params: Google::Protobuf::Struct.new(fields: { "list" => Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [Google::Protobuf::Value.new(string_value: "1"), Google::Protobuf::Value.new(string_value: "2"), Google::Protobuf::Value.new(string_value: "3")])) }), param_types: { "list" => Google::Cloud::Spanner::V1::Type.new(code: :ARRAY, array_element_type: Google::Cloud::Spanner::V1::Type.new(code: :INT64)) }, seqno: 1, options: default_options results = transaction.execute_query "SELECT * FROM users WHERE project_ids = @list", params: { list: [1,2,3] } @@ -178,7 +189,7 @@ it "can execute a query with an empty Array param" do mock = Minitest::Mock.new session.service.mocked_service = mock - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE project_ids = @list", transaction: tx_selector, params: Google::Protobuf::Struct.new(fields: { "list" => Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [])) }), param_types: { "list" => Google::Cloud::Spanner::V1::Type.new(code: :ARRAY, array_element_type: Google::Cloud::Spanner::V1::Type.new(code: :INT64)) }, seqno: 1, options: default_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE project_ids = @list", transaction: tx_selector_begin, params: Google::Protobuf::Struct.new(fields: { "list" => Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [])) }), param_types: { "list" => Google::Cloud::Spanner::V1::Type.new(code: :ARRAY, array_element_type: Google::Cloud::Spanner::V1::Type.new(code: :INT64)) }, seqno: 1, options: default_options results = transaction.execute_query "SELECT * FROM users WHERE project_ids = @list", params: { list: [] }, types: { list: [:INT64] } @@ -190,7 +201,7 @@ it "can execute a query with a simple Hash param" do mock = Minitest::Mock.new session.service.mocked_service = mock - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE settings = @dict", transaction: tx_selector, params: Google::Protobuf::Struct.new(fields: { "dict" => Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [Google::Protobuf::Value.new(string_value: "production")])) }), param_types: { "dict" => Google::Cloud::Spanner::V1::Type.new(code: :STRUCT, struct_type: Google::Cloud::Spanner::V1::StructType.new(fields: [Google::Cloud::Spanner::V1::StructType::Field.new(name: "env", type: Google::Cloud::Spanner::V1::Type.new(code: :STRING))])) }, seqno: 1, options: default_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE settings = @dict", transaction: tx_selector_begin, params: Google::Protobuf::Struct.new(fields: { "dict" => Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [Google::Protobuf::Value.new(string_value: "production")])) }), param_types: { "dict" => Google::Cloud::Spanner::V1::Type.new(code: :STRUCT, struct_type: Google::Cloud::Spanner::V1::StructType.new(fields: [Google::Cloud::Spanner::V1::StructType::Field.new(name: "env", type: Google::Cloud::Spanner::V1::Type.new(code: :STRING))])) }, seqno: 1, options: default_options results = transaction.execute_query "SELECT * FROM users WHERE settings = @dict", params: { dict: { env: :production } } @@ -202,7 +213,7 @@ it "can execute a query with a complex Hash param" do mock = Minitest::Mock.new session.service.mocked_service = mock - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE settings = @dict", transaction: tx_selector, params: Google::Protobuf::Struct.new(fields: { "dict" => Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [Google::Protobuf::Value.new(string_value: "production"), Google::Protobuf::Value.new(number_value: 0.9), Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [Google::Protobuf::Value.new(string_value: "1"), Google::Protobuf::Value.new(string_value: "2"), Google::Protobuf::Value.new(string_value: "3")] )) ])) }), param_types: { "dict" => Google::Cloud::Spanner::V1::Type.new(code: :STRUCT, struct_type: Google::Cloud::Spanner::V1::StructType.new(fields: [Google::Cloud::Spanner::V1::StructType::Field.new(name: "env", type: Google::Cloud::Spanner::V1::Type.new(code: :STRING)), Google::Cloud::Spanner::V1::StructType::Field.new(name: "score", type: Google::Cloud::Spanner::V1::Type.new(code: :FLOAT64)), Google::Cloud::Spanner::V1::StructType::Field.new(name: "project_ids", type: Google::Cloud::Spanner::V1::Type.new(code: :ARRAY, array_element_type: Google::Cloud::Spanner::V1::Type.new(code: :INT64)))] )) }, seqno: 1, options: default_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE settings = @dict", transaction: tx_selector_begin, params: Google::Protobuf::Struct.new(fields: { "dict" => Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [Google::Protobuf::Value.new(string_value: "production"), Google::Protobuf::Value.new(number_value: 0.9), Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [Google::Protobuf::Value.new(string_value: "1"), Google::Protobuf::Value.new(string_value: "2"), Google::Protobuf::Value.new(string_value: "3")] )) ])) }), param_types: { "dict" => Google::Cloud::Spanner::V1::Type.new(code: :STRUCT, struct_type: Google::Cloud::Spanner::V1::StructType.new(fields: [Google::Cloud::Spanner::V1::StructType::Field.new(name: "env", type: Google::Cloud::Spanner::V1::Type.new(code: :STRING)), Google::Cloud::Spanner::V1::StructType::Field.new(name: "score", type: Google::Cloud::Spanner::V1::Type.new(code: :FLOAT64)), Google::Cloud::Spanner::V1::StructType::Field.new(name: "project_ids", type: Google::Cloud::Spanner::V1::Type.new(code: :ARRAY, array_element_type: Google::Cloud::Spanner::V1::Type.new(code: :INT64)))] )) }, seqno: 1, options: default_options results = transaction.execute_query "SELECT * FROM users WHERE settings = @dict", params: { dict: { env: "production", score: 0.9, project_ids: [1,2,3] } } @@ -214,7 +225,7 @@ it "can execute a query with an Array of Hashes" do mock = Minitest::Mock.new session.service.mocked_service = mock - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE STRUCT(name, email) IN UNNEST(@data)", transaction: tx_selector, params: Google::Protobuf::Struct.new(fields: { "data" => Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [Google::Protobuf::Value.new(string_value: "mike"), Google::Protobuf::Value.new(string_value: "mike@example.net")] )), Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [Google::Protobuf::Value.new(string_value: "chris"), Google::Protobuf::Value.new(string_value: "chris@example.net")] ))] )) } ), param_types: { "data" => Google::Cloud::Spanner::V1::Type.new(code: :ARRAY, array_element_type: Google::Cloud::Spanner::V1::Type.new(code: :STRUCT, struct_type: Google::Cloud::Spanner::V1::StructType.new(fields: [ Google::Cloud::Spanner::V1::StructType::Field.new(name: "name", type: Google::Cloud::Spanner::V1::Type.new(code: :STRING)), Google::Cloud::Spanner::V1::StructType::Field.new(name: "email", type: Google::Cloud::Spanner::V1::Type.new(code: :STRING))] ))) }, seqno: 1, options: default_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE STRUCT(name, email) IN UNNEST(@data)", transaction: tx_selector_begin, params: Google::Protobuf::Struct.new(fields: { "data" => Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [Google::Protobuf::Value.new(string_value: "mike"), Google::Protobuf::Value.new(string_value: "mike@example.net")] )), Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [Google::Protobuf::Value.new(string_value: "chris"), Google::Protobuf::Value.new(string_value: "chris@example.net")] ))] )) } ), param_types: { "data" => Google::Cloud::Spanner::V1::Type.new(code: :ARRAY, array_element_type: Google::Cloud::Spanner::V1::Type.new(code: :STRUCT, struct_type: Google::Cloud::Spanner::V1::StructType.new(fields: [ Google::Cloud::Spanner::V1::StructType::Field.new(name: "name", type: Google::Cloud::Spanner::V1::Type.new(code: :STRING)), Google::Cloud::Spanner::V1::StructType::Field.new(name: "email", type: Google::Cloud::Spanner::V1::Type.new(code: :STRING))] ))) }, seqno: 1, options: default_options results = transaction.execute "SELECT * FROM users WHERE STRUCT(name, email) IN UNNEST(@data)", params: { data: [{ name: "mike", email: "mike@example.net" }, { name: "chris", email: "chris@example.net" }] } @@ -226,7 +237,7 @@ it "can execute a query with an Array of STRUCTs" do mock = Minitest::Mock.new session.service.mocked_service = mock - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE STRUCT(name, email) IN UNNEST(@data)", transaction: tx_selector, params: Google::Protobuf::Struct.new(fields: { "data" => Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [Google::Protobuf::Value.new(string_value: "mike"), Google::Protobuf::Value.new(string_value: "mike@example.net")] )), Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [Google::Protobuf::Value.new(string_value: "chris"), Google::Protobuf::Value.new(string_value: "chris@example.net")] ))] )) } ), param_types: { "data" => Google::Cloud::Spanner::V1::Type.new(code: :ARRAY, array_element_type: Google::Cloud::Spanner::V1::Type.new(code: :STRUCT, struct_type: Google::Cloud::Spanner::V1::StructType.new(fields: [ Google::Cloud::Spanner::V1::StructType::Field.new(name: "name", type: Google::Cloud::Spanner::V1::Type.new(code: :STRING)), Google::Cloud::Spanner::V1::StructType::Field.new(name: "email", type: Google::Cloud::Spanner::V1::Type.new(code: :STRING))] ))) }, seqno: 1, options: default_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE STRUCT(name, email) IN UNNEST(@data)", transaction: tx_selector_begin, params: Google::Protobuf::Struct.new(fields: { "data" => Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [Google::Protobuf::Value.new(string_value: "mike"), Google::Protobuf::Value.new(string_value: "mike@example.net")] )), Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [Google::Protobuf::Value.new(string_value: "chris"), Google::Protobuf::Value.new(string_value: "chris@example.net")] ))] )) } ), param_types: { "data" => Google::Cloud::Spanner::V1::Type.new(code: :ARRAY, array_element_type: Google::Cloud::Spanner::V1::Type.new(code: :STRUCT, struct_type: Google::Cloud::Spanner::V1::StructType.new(fields: [ Google::Cloud::Spanner::V1::StructType::Field.new(name: "name", type: Google::Cloud::Spanner::V1::Type.new(code: :STRING)), Google::Cloud::Spanner::V1::StructType::Field.new(name: "email", type: Google::Cloud::Spanner::V1::Type.new(code: :STRING))] ))) }, seqno: 1, options: default_options struct_fields = transaction.fields name: :STRING, email: :STRING results = transaction.execute "SELECT * FROM users WHERE STRUCT(name, email) IN UNNEST(@data)", params: { data: [struct_fields.data(["mike", "mike@example.net"]), struct_fields.data(["chris","chris@example.net"])] } @@ -239,7 +250,7 @@ it "can execute a query with an empty Hash param" do mock = Minitest::Mock.new session.service.mocked_service = mock - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE settings = @dict", transaction: tx_selector, params: Google::Protobuf::Struct.new(fields: { "dict" => Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [])) }), param_types: { "dict" => Google::Cloud::Spanner::V1::Type.new(code: :STRUCT, struct_type: Google::Cloud::Spanner::V1::StructType.new(fields: [])) }, seqno: 1, options: default_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users WHERE settings = @dict", transaction: tx_selector_begin, params: Google::Protobuf::Struct.new(fields: { "dict" => Google::Protobuf::Value.new(list_value: Google::Protobuf::ListValue.new(values: [])) }), param_types: { "dict" => Google::Cloud::Spanner::V1::Type.new(code: :STRUCT, struct_type: Google::Cloud::Spanner::V1::StructType.new(fields: [])) }, seqno: 1, options: default_options results = transaction.execute_query "SELECT * FROM users WHERE settings = @dict", params: { dict: { } } @@ -252,7 +263,7 @@ expect_query_options = { optimizer_version: "4", optimizer_statistics_package: "auto_20191128_14_47_22UTC" } mock = Minitest::Mock.new session.service.mocked_service = mock - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector, seqno: 1, options: default_options, query_options: expect_query_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector_begin, seqno: 1, options: default_options, query_options: expect_query_options results = transaction.execute_query "SELECT * FROM users", query_options: expect_query_options @@ -274,7 +285,7 @@ mock = Minitest::Mock.new session.service.mocked_service = mock - expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector, seqno: 1, options: expect_options + expect_execute_streaming_sql results_enum, session_grpc.name, "SELECT * FROM users", transaction: tx_selector_begin, seqno: 1, options: expect_options results = transaction.execute_query "SELECT * FROM users", call_options: call_options @@ -304,6 +315,10 @@ def assert_results results _(results).must_be_kind_of Google::Cloud::Spanner::Results + metadata = results.metadata + + _(metadata.transaction.id).must_equal transaction_id + _(results.fields).wont_be :nil? _(results.fields).must_be_kind_of Google::Cloud::Spanner::Fields _(results.fields.keys.count).must_equal 9 diff --git a/google-cloud-spanner/test/google/cloud/spanner/transaction/read_test.rb b/google-cloud-spanner/test/google/cloud/spanner/transaction/read_test.rb index 302751a..aff12cd 100644 --- a/google-cloud-spanner/test/google/cloud/spanner/transaction/read_test.rb +++ b/google-cloud-spanner/test/google/cloud/spanner/transaction/read_test.rb @@ -22,8 +22,16 @@ let(:session) { Google::Cloud::Spanner::Session.from_grpc session_grpc, spanner.service } let(:transaction_id) { "tx789" } let(:transaction_grpc) { Google::Cloud::Spanner::V1::Transaction.new id: transaction_id } - let(:transaction) { Google::Cloud::Spanner::Transaction.from_grpc transaction_grpc, session } + # let(:transaction) { Google::Cloud::Spanner::Transaction.from_grpc transaction_grpc, session } + let(:transaction) { Google::Cloud::Spanner::Transaction.from_grpc nil, session } let(:tx_selector) { Google::Cloud::Spanner::V1::TransactionSelector.new id: transaction_id } + let(:tx_selector_begin) do + Google::Cloud::Spanner::V1::TransactionSelector.new( + begin: Google::Cloud::Spanner::V1::TransactionOptions.new( + read_write: Google::Cloud::Spanner::V1::TransactionOptions::ReadWrite.new + ) + ) + end let(:default_options) { ::Gapic::CallOptions.new metadata: { "google-cloud-resource-prefix" => database_path(instance_id, database_id) } } let :results_hash1 do { @@ -41,7 +49,8 @@ { name: "project_ids", type: { code: :ARRAY, array_element_type: { code: :INT64 } } } ] - } + }, + transaction: { id: transaction_id }, } } end @@ -82,7 +91,7 @@ session: session_grpc.name, table: "my-table", columns: ["id", "name", "active", "age", "score", "updated_at", "birthday", "avatar", "project_ids"], key_set: Google::Cloud::Spanner::V1::KeySet.new(all: true), - transaction: tx_selector, index: nil, limit: nil, resume_token: nil, partition_token: nil, + transaction: tx_selector_begin, index: nil, limit: nil, resume_token: nil, partition_token: nil, request_options: nil }, default_options] session.service.mocked_service = mock @@ -102,7 +111,7 @@ session: session_grpc.name, table: "my-table", columns: ["id", "name", "active", "age", "score", "updated_at", "birthday", "avatar", "project_ids"], key_set: Google::Cloud::Spanner::V1::KeySet.new(keys: [Google::Cloud::Spanner::Convert.object_to_grpc_value([1]).list_value, Google::Cloud::Spanner::Convert.object_to_grpc_value([2]).list_value, Google::Cloud::Spanner::Convert.object_to_grpc_value([3]).list_value]), - transaction: tx_selector, index: nil, limit: nil, resume_token: nil, partition_token: nil, + transaction: tx_selector_begin, index: nil, limit: nil, resume_token: nil, partition_token: nil, request_options: nil }, default_options] session.service.mocked_service = mock @@ -122,7 +131,7 @@ session: session_grpc.name, table: "my-table", columns: ["id", "name", "active", "age", "score", "updated_at", "birthday", "avatar", "project_ids"], key_set: Google::Cloud::Spanner::V1::KeySet.new(keys: [Google::Cloud::Spanner::Convert.object_to_grpc_value([1,1]).list_value, Google::Cloud::Spanner::Convert.object_to_grpc_value([2,2]).list_value, Google::Cloud::Spanner::Convert.object_to_grpc_value([3,3]).list_value]), - transaction: tx_selector, index: "MyTableCompositeKey", limit: nil, resume_token: nil, partition_token: nil, + transaction: tx_selector_begin, index: "MyTableCompositeKey", limit: nil, resume_token: nil, partition_token: nil, request_options: nil }, default_options] session.service.mocked_service = mock @@ -142,7 +151,7 @@ session: session_grpc.name, table: "my-table", columns: ["id", "name", "active", "age", "score", "updated_at", "birthday", "avatar", "project_ids"], key_set: Google::Cloud::Spanner::V1::KeySet.new(ranges: [Google::Cloud::Spanner::Convert.to_key_range([1,1]..[3,3])]), - transaction: tx_selector, index: "MyTableCompositeKey", limit: nil, resume_token: nil, partition_token: nil, + transaction: tx_selector_begin, index: "MyTableCompositeKey", limit: nil, resume_token: nil, partition_token: nil, request_options: nil }, default_options] session.service.mocked_service = mock @@ -163,7 +172,7 @@ session: session_grpc.name, table: "my-table", columns: ["id", "name", "active", "age", "score", "updated_at", "birthday", "avatar", "project_ids"], key_set: Google::Cloud::Spanner::V1::KeySet.new(all: true), - transaction: tx_selector, index: nil, limit: 5, resume_token: nil, partition_token: nil, + transaction: tx_selector_begin, index: nil, limit: 5, resume_token: nil, partition_token: nil, request_options: nil }, default_options] session.service.mocked_service = mock @@ -183,7 +192,7 @@ session: session_grpc.name, table: "my-table", columns: ["id", "name", "active", "age", "score", "updated_at", "birthday", "avatar", "project_ids"], key_set: Google::Cloud::Spanner::V1::KeySet.new(keys: [Google::Cloud::Spanner::Convert.object_to_grpc_value([1]).list_value]), - transaction: tx_selector, index: nil, limit: 1, resume_token: nil, partition_token: nil, + transaction: tx_selector_begin, index: nil, limit: 1, resume_token: nil, partition_token: nil, request_options: nil }, default_options] session.service.mocked_service = mock @@ -213,7 +222,7 @@ session: session_grpc.name, table: "my-table", columns: ["id", "name", "active", "age", "score", "updated_at", "birthday", "avatar", "project_ids"], key_set: Google::Cloud::Spanner::V1::KeySet.new(all: true), - transaction: tx_selector, index: nil, limit: nil, resume_token: nil, partition_token: nil, + transaction: tx_selector_begin, index: nil, limit: nil, resume_token: nil, partition_token: nil, request_options: nil }, expect_options] session.service.mocked_service = mock @@ -226,7 +235,7 @@ end it "execute read with transaction and request tag" do - transaction = Google::Cloud::Spanner::Transaction.from_grpc transaction_grpc, session + transaction = Google::Cloud::Spanner::Transaction.from_grpc nil, session transaction.transaction_tag = "Tag-1" columns = [:id, :name, :active, :age, :score, :updated_at, :birthday, :avatar, :project_ids] @@ -235,7 +244,7 @@ session: session_grpc.name, table: "my-table", columns: ["id", "name", "active", "age", "score", "updated_at", "birthday", "avatar", "project_ids"], key_set: Google::Cloud::Spanner::V1::KeySet.new(all: true), - transaction: tx_selector, index: nil, limit: nil, resume_token: nil, partition_token: nil, + transaction: tx_selector_begin, index: nil, limit: nil, resume_token: nil, partition_token: nil, request_options: { transaction_tag: "Tag-1", request_tag: "Tag-1-1" } }, default_options] session.service.mocked_service = mock diff --git a/google-cloud-spanner/test/helper.rb b/google-cloud-spanner/test/helper.rb index 788903e..2a6643c 100644 --- a/google-cloud-spanner/test/helper.rb +++ b/google-cloud-spanner/test/helper.rb @@ -38,7 +38,8 @@ def shutdown_client! client # extract the pool pool = client.instance_variable_get :@pool # remove all sessions so we don't have to handle the calls to session_delete - pool.all_sessions = [] + pool.sessions_available = [] + pool.sessions_in_use = [] # close the client client.close