Skip to content

Commit

Permalink
feat(google-cloud-spanner): Implement "Inline Begin Transaction" (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
SandeepTuniki committed Sep 5, 2023
1 parent f316f3b commit 49ac4d9
Show file tree
Hide file tree
Showing 27 changed files with 880 additions and 870 deletions.
3 changes: 2 additions & 1 deletion .toys/ci.rb
Expand Up @@ -123,7 +123,8 @@ def setup_auth_env
"GCLOUD_TEST_PROJECT" => final_project,
"GOOGLE_CLOUD_PROJECT" => final_project,
"GCLOUD_TEST_KEYFILE" => final_keyfile,
"GOOGLE_APPLICATION_CREDENTIALS" => ENV["GOOGLE_APPLICATION_CREDENTIALS"]
"GOOGLE_APPLICATION_CREDENTIALS" => ENV["GOOGLE_APPLICATION_CREDENTIALS"],
"MT_COMPAT" => "true"
}
end

Expand Down
Expand Up @@ -80,7 +80,7 @@
_(prior_results.rows.count).must_equal 3

timestamp = db[dialect].transaction do |tx|
_(tx.transaction_id).wont_be :nil?
_(tx.no_existing_transaction?).must_equal true

row_counts = tx.batch_update do |b|
b.batch_update insert_dml[dialect], params: insert_params[dialect]
Expand All @@ -107,7 +107,7 @@
_(prior_results.rows.count).must_equal 3

timestamp = db[dialect].transaction do |tx|
_(tx.transaction_id).wont_be :nil?
_(tx.no_existing_transaction?).must_equal true

err = expect do
tx.batch_update { |b| } # rubocop:disable Lint/EmptyBlock
Expand All @@ -124,7 +124,7 @@
_(prior_results.rows.count).must_equal 3

timestamp = db[dialect].transaction do |tx|
_(tx.transaction_id).wont_be :nil?
_(tx.no_existing_transaction?).must_equal true
begin
tx.batch_update do |b|
b.batch_update insert_dml[dialect], params: insert_params[dialect]
Expand All @@ -148,12 +148,31 @@
_(timestamp).must_be_kind_of Time
end

it "raises BatchUpdateError when the first statement in Batch DML is a syntax error for #{dialect}" do
prior_results = db[dialect].execute_sql "SELECT * FROM accounts"
_(prior_results.rows.count).must_equal 3
db[dialect].transaction do |tx|
begin
_(tx.no_existing_transaction?).must_equal true
tx.batch_update do |b|
b.batch_update update_dml_syntax_error, params: update_params[dialect]
end
rescue Google::Cloud::Spanner::BatchUpdateError => e
_(e.cause).must_be_kind_of Google::Cloud::InvalidArgumentError
_(e.cause.message).must_equal "Statement 0: 'UPDDDD accounts' is not valid DML."
end
_(tx.no_existing_transaction?).must_equal true
end
prior_results = db[dialect].execute_sql "SELECT * FROM accounts"
_(prior_results.rows.count).must_equal 3
end

it "runs execute_update and batch_update in the same transaction for #{dialect}" do
prior_results = db[dialect].execute_sql "SELECT * FROM accounts"
_(prior_results.rows.count).must_equal 3

timestamp = db[dialect].transaction do |tx|
_(tx.transaction_id).wont_be :nil?
_(tx.no_existing_transaction?).must_equal true

row_counts = tx.batch_update do |b|
b.batch_update insert_dml[dialect], params: insert_params[dialect]
Expand Down
6 changes: 3 additions & 3 deletions google-cloud-spanner/acceptance/spanner/client/dml_test.rb
Expand Up @@ -71,7 +71,7 @@
_(prior_results.rows.count).must_equal 3

timestamp = db[dialect].transaction do |tx|
_(tx.transaction_id).wont_be :nil?
_(tx.no_existing_transaction?).must_equal true

# Execute a DML using execute_update and make sure data is updated and correct count is returned.
insert_row_count = tx.execute_update \
Expand Down Expand Up @@ -112,7 +112,7 @@
_(prior_results.rows.count).must_equal 3

timestamp = db[dialect].transaction do |tx|
_(tx.transaction_id).wont_be :nil?
_(tx.no_existing_transaction?).must_equal true

# Execute a DML using execute_update and make sure data is updated and correct count is returned.
insert_row_count = tx.execute_update \
Expand Down Expand Up @@ -141,7 +141,7 @@
_(prior_results.rows.count).must_equal 3

timestamp = db[dialect].transaction do |tx|
_(tx.transaction_id).wont_be :nil?
_(tx.no_existing_transaction?).must_equal true

# Execute a DML statement, followed by calling existing insert method,
# commit the transaction and assert that both the updates are present.
Expand Down
34 changes: 30 additions & 4 deletions google-cloud-spanner/acceptance/spanner/client/transaction_test.rb
Expand Up @@ -35,9 +35,35 @@
db.delete "accounts"
end

it "runs basic inline begin transaction" do
db.transaction do |tx|
_(tx.no_existing_transaction?).must_equal true
tx_results = tx.execute_query "SELECT * from accounts"
_(tx.transaction_id).wont_be :nil?
_(tx_results.rows.count).must_equal default_account_rows.length
end
end

it "re-uses existing transaction for multiple queries" do
db.transaction do |tx|
_(tx.no_existing_transaction?).must_equal true

tx_results = tx.execute_query "SELECT * from accounts"
tx_id_1 = tx.transaction_id
_(tx_id_1).wont_be :nil?
_(tx_results.rows.count).must_equal default_account_rows.length

tx_results_2 = tx.execute_query "SELECT * from accounts WHERE active = true"
tx_id_2 = tx.transaction_id
_(tx_id_2).wont_be :nil?
_(tx_id_1).must_equal tx_id_2
_(tx_results_2.rows.count).must_equal 2
end
end

it "modifies accounts and verifies data with reads" do
timestamp = db.transaction do |tx|
_(tx.transaction_id).wont_be :nil?
_(tx.no_existing_transaction?).must_equal true

tx_results = tx.read "accounts", columns
_(tx_results).must_be_kind_of Google::Cloud::Spanner::Results
Expand Down Expand Up @@ -70,7 +96,7 @@

it "can rollback a transaction without passing on using Rollback" do
timestamp = db.transaction do |tx|
_(tx.transaction_id).wont_be :nil?
_(tx.no_existing_transaction?).must_equal true

tx_results = tx.read "accounts", columns
_(tx_results).must_be_kind_of Google::Cloud::Spanner::Results
Expand Down Expand Up @@ -101,7 +127,7 @@
it "can rollback a transaction and pass on the error" do
assert_raises ZeroDivisionError do
db.transaction do |tx|
_(tx.transaction_id).wont_be :nil?
_(tx.no_existing_transaction?).must_equal true

tx_results = tx.read "accounts", columns
_(tx_results).must_be_kind_of Google::Cloud::Spanner::Results
Expand Down Expand Up @@ -220,7 +246,7 @@

commit_options = { return_commit_stats: true }
commit_resp = db.transaction commit_options: commit_options do |tx|
_(tx.transaction_id).wont_be :nil?
_(tx.no_existing_transaction?).must_equal true

tx.insert "accounts", additional_account
end
Expand Down
@@ -0,0 +1,50 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


module Google
module Cloud
module Spanner
##
# @private Helper class to process BatchDML response
class BatchUpdateResults
## Object of type
# Google::Cloud::Spanner::V1::ExecuteBatchDmlResponse
attr_reader :grpc

def initialize grpc
@grpc = grpc
end

def row_counts
if @grpc.status.code.zero?
@grpc.result_sets.map { |rs| rs.stats.row_count_exact }
else
begin
raise Google::Cloud::Error.from_error @grpc.status
rescue Google::Cloud::Error
raise Google::Cloud::Spanner::BatchUpdateError.from_grpc @grpc
end
end
end

##
# Returns transaction if available. Otherwise returns nil
def transaction
@grpc&.result_sets&.first&.metadata&.transaction
end
end
end
end
end
49 changes: 31 additions & 18 deletions google-cloud-spanner/lib/google/cloud/spanner/client.rb
Expand Up @@ -49,6 +49,10 @@ module Spanner
# end
#
class Client
##
# @private
IS_TRANSACTION_RUNNING_KEY = "ruby_spanner_is_transaction_running".freeze

##
# @private Creates a new Spanner Client instance.
def initialize project, instance_id, database_id, session_labels: nil,
Expand Down Expand Up @@ -1634,6 +1638,7 @@ def commit commit_options: nil, request_options: nil,
# rubocop:disable Metrics/MethodLength
# rubocop:disable Metrics/BlockLength


##
# Creates a transaction for reads and writes that execute atomically at
# a single logical point in time across columns, rows, and tables in a
Expand Down Expand Up @@ -1788,7 +1793,7 @@ def commit commit_options: nil, request_options: nil,
def transaction deadline: 120, commit_options: nil,
request_options: nil, call_options: nil
ensure_service!
unless Thread.current[:transaction_id].nil?
unless Thread.current[IS_TRANSACTION_RUNNING_KEY].nil?
raise "Nested transactions are not allowed"
end

Expand All @@ -1799,17 +1804,20 @@ def transaction deadline: 120, commit_options: nil,
request_options = Convert.to_request_options \
request_options, tag_type: :transaction_tag

@pool.with_transaction do |tx|
@pool.with_session do |session|
tx = session.create_empty_transaction
if request_options
tx.transaction_tag = request_options[:transaction_tag]
end

begin
Thread.current[:transaction_id] = tx.transaction_id
Thread.current[IS_TRANSACTION_RUNNING_KEY] = true
yield tx
transaction_id = nil
transaction_id = tx.transaction_id if tx.existing_transaction?
commit_resp = @project.service.commit \
tx.session.path, tx.mutations,
transaction_id: tx.transaction_id,
transaction_id: transaction_id,
commit_options: commit_options,
request_options: request_options,
call_options: call_options
Expand All @@ -1819,28 +1827,21 @@ def transaction deadline: 120, commit_options: nil,
Google::Cloud::AbortedError,
GRPC::Internal,
Google::Cloud::InternalError => e
raise e if internal_error_and_not_retryable? e
# Re-raise if deadline has passed
if current_time - start_time > deadline
if e.is_a? GRPC::BadStatus
e = Google::Cloud::Error.from_error e
end
raise e
end
check_and_propagate_err! e, (current_time - start_time > deadline)
# Sleep the amount from RetryDelay, or incremental backoff
sleep(delay_from_aborted(e) || backoff *= 1.3)
# Create new transaction on the session and retry the block
tx = tx.session.create_transaction
tx = session.create_transaction
retry
rescue StandardError => e
# Rollback transaction when handling unexpected error
tx.session.rollback tx.transaction_id
tx.session.rollback tx.transaction_id if tx.existing_transaction?
# Return nil if raised with rollback.
return nil if e.is_a? Rollback
# Re-raise error.
raise e
ensure
Thread.current[:transaction_id] = nil
Thread.current[IS_TRANSACTION_RUNNING_KEY] = nil
end
end
end
Expand Down Expand Up @@ -1923,7 +1924,7 @@ def snapshot strong: nil, timestamp: nil, read_timestamp: nil,
exact_staleness: exact_staleness

ensure_service!
unless Thread.current[:transaction_id].nil?
unless Thread.current[IS_TRANSACTION_RUNNING_KEY].nil?
raise "Nested snapshots are not allowed"
end

Expand All @@ -1933,11 +1934,11 @@ def snapshot strong: nil, timestamp: nil, read_timestamp: nil,
timestamp: (timestamp || read_timestamp),
staleness: (staleness || exact_staleness),
call_options: call_options
Thread.current[:transaction_id] = snp_grpc.id
Thread.current[IS_TRANSACTION_RUNNING_KEY] = true
snp = Snapshot.from_grpc snp_grpc, session
yield snp if block_given?
ensure
Thread.current[:transaction_id] = nil
Thread.current[IS_TRANSACTION_RUNNING_KEY] = nil
end
nil
end
Expand Down Expand Up @@ -2272,6 +2273,18 @@ def delay_from_aborted err
nil
end

##
# Determines if a transaction error should be propagated to the user.
# And re-raises the error accordingly
def check_and_propagate_err! err, deadline_passed
raise err if internal_error_and_not_retryable? err
return unless deadline_passed
if err.is_a? GRPC::BadStatus
raise Google::Cloud::Error.from_error err
end
raise err
end

def internal_error_and_not_retryable? error
(error.instance_of?(Google::Cloud::InternalError) ||
error.instance_of?(GRPC::Internal)) &&
Expand Down

0 comments on commit 49ac4d9

Please sign in to comment.