Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(google-cloud-spanner): Implement "Inline Begin Transaction" #54

Merged
merged 100 commits into from Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
100 commits
Select commit Hold shift + click to select a range
96cfc0f
Rename queues to stacks in sessions
SandeepTuniki Apr 6, 2023
cad4aeb
Merge branch 'main' into inline-begin-transaction
SandeepTuniki May 29, 2023
a7e1f0f
don't use transaction_stack
SandeepTuniki May 30, 2023
f861938
fix a bug
SandeepTuniki May 30, 2023
4b6fcc9
skip tests temporarily
SandeepTuniki May 30, 2023
edf5e67
some changes
SandeepTuniki May 31, 2023
756186e
remove obsolete code
SandeepTuniki May 31, 2023
073500a
remove obsolete tests
SandeepTuniki May 31, 2023
04ed2af
Remove write_ratio field from Pool class
SandeepTuniki Jun 1, 2023
de985db
use boolean field to track progress of transaction instead of transac…
SandeepTuniki Jun 19, 2023
fd17c22
Defer creating gapic transaction
SandeepTuniki Jul 18, 2023
f79eb32
Merge branch 'main' into ilb-with-write-ratio
SandeepTuniki Jul 18, 2023
6e6c922
Remove transaction_stack from Pool class
SandeepTuniki Jul 18, 2023
83832a9
Use a set to keep track of checked out sessions
SandeepTuniki Jul 18, 2023
b7327ed
Revert "Use a set to keep track of checked out sessions"
SandeepTuniki Jul 19, 2023
819b4d0
Remove transaction related code from Pool class
SandeepTuniki Jul 19, 2023
d08198e
Add basic ILB for execute_query method
SandeepTuniki Jul 19, 2023
a0034b4
Add acceptance test for a basic inline begin transaction
SandeepTuniki Jul 20, 2023
2ebcacc
Add another (failing) test
SandeepTuniki Jul 21, 2023
121ffb7
Make transaction available to subsequent statements through inline begin
SandeepTuniki Jul 21, 2023
1f64c8b
Change helper script to use existing resources
SandeepTuniki Jul 21, 2023
7cc8106
Remove useless parameter
SandeepTuniki Jul 21, 2023
5566487
Add ILB for batch_update
SandeepTuniki Jul 24, 2023
5fcaa93
Change some batch_update tests
SandeepTuniki Jul 24, 2023
0368dd7
Remove debug statements
SandeepTuniki Jul 24, 2023
14d5361
Add ILB for tx.read API
SandeepTuniki Jul 24, 2023
85392ce
Preserve public behviour of transaction_id method
SandeepTuniki Jul 27, 2023
23051d8
Remove obsolete checks in acceptance tests
SandeepTuniki Jul 27, 2023
75cf6bb
Remove obsolete check in acceptance test
SandeepTuniki Jul 27, 2023
770cf74
Fix some unit tests in transaction_test.rb
SandeepTuniki Jul 28, 2023
e038e9a
Fix more broken tests
SandeepTuniki Jul 28, 2023
2daa3a3
Fix more tests
SandeepTuniki Jul 28, 2023
e049c46
Remove focus from all unit tests
SandeepTuniki Jul 28, 2023
e5abbb2
Fix broken unit tests
SandeepTuniki Jul 28, 2023
1261c9c
Remove focus on a unit test
SandeepTuniki Jul 31, 2023
aa1bed1
Fix failing unit tests related to rollbacks
SandeepTuniki Jul 31, 2023
f174b40
Clarify behaviour of unit tests
SandeepTuniki Jul 31, 2023
eb2b240
Fix failing unit tests for transaction retries
SandeepTuniki Aug 1, 2023
9580949
Remove obsolete tests
SandeepTuniki Aug 1, 2023
21076f8
Remove focus keyword from all unit tests
SandeepTuniki Aug 1, 2023
22c5a79
Don't focus any acceptance test
SandeepTuniki Aug 1, 2023
921a334
Remove all the commented out code from unit tests
SandeepTuniki Aug 1, 2023
3a1d3cf
Minitest fix
SandeepTuniki Aug 1, 2023
0cd4061
Remove debug gem imports
SandeepTuniki Aug 1, 2023
0eb8af5
Remove print and debugging statements
SandeepTuniki Aug 1, 2023
da5ba46
Revert spanner_helper code
SandeepTuniki Aug 1, 2023
64305c7
Improve acceptance test checks
SandeepTuniki Aug 1, 2023
263d5e3
Replace an "it" statement that was mistakenly removed
SandeepTuniki Aug 1, 2023
0fea00e
Minor improvement
SandeepTuniki Aug 1, 2023
933e23d
Format code and documentation
SandeepTuniki Aug 1, 2023
72a65a9
Add basic code to handle parallel opearations within ILB
SandeepTuniki Aug 3, 2023
445c72f
Remove scratchpad code
SandeepTuniki Aug 3, 2023
31be4b1
Move concurrent queries test to a separate file
SandeepTuniki Aug 3, 2023
a4462b5
Fix an edge case
SandeepTuniki Aug 3, 2023
cf9481f
Fix failing BatchUpdate acceptance error
SandeepTuniki Aug 7, 2023
202bf4e
Add comment to clarify intent
SandeepTuniki Aug 7, 2023
f91d478
Merge branch 'ilb-with-write-ratio' into ilb-with-parallel-queries
SandeepTuniki Aug 7, 2023
3074043
Refactor code for batch_update
SandeepTuniki Aug 7, 2023
9c3fa71
Fix BatchDML failing acceptance test
SandeepTuniki Aug 8, 2023
c21b2e3
Simplify with helper method
SandeepTuniki Aug 8, 2023
206b87b
Revert changes to BatchUpdateError class
SandeepTuniki Aug 8, 2023
24617ac
Merge branch 'ilb-with-write-ratio' into ilb-with-parallel-queries
SandeepTuniki Aug 8, 2023
2e95964
Fix concurrent transactions case for execute_query
SandeepTuniki Aug 8, 2023
232f22f
Fix failing batch DML acceptance test
SandeepTuniki Aug 8, 2023
b08732b
Merge branch 'ilb-with-write-ratio' into ilb-with-parallel-queries
SandeepTuniki Aug 9, 2023
17781ea
Fix a typo and comment the code
SandeepTuniki Aug 9, 2023
c51ed60
Improve the code readability
SandeepTuniki Aug 9, 2023
4a1eb16
Improve code readability
SandeepTuniki Aug 9, 2023
ae466cc
Handle error case during inline operation in Transaction
SandeepTuniki Aug 10, 2023
a6804a8
Add mutex for tx.read() method
SandeepTuniki Aug 16, 2023
2660ce1
Refactor mutex processing into a helper method
SandeepTuniki Aug 16, 2023
ac55f19
Move begin_transaction into safe_execute
SandeepTuniki Aug 16, 2023
b43974f
Improve method description
SandeepTuniki Aug 16, 2023
c526661
Improve code readability
SandeepTuniki Aug 16, 2023
afba31b
Simplify code further
SandeepTuniki Aug 16, 2023
52f5949
Remove obsolete instance variable
SandeepTuniki Aug 16, 2023
d1a1081
Reduce cyclomatic complexity of transaction method
SandeepTuniki Aug 16, 2023
63f85ca
Rename method
SandeepTuniki Aug 16, 2023
bb3d589
Merge branch 'ilb-with-write-ratio' into ilb-with-parallel-queries
SandeepTuniki Aug 18, 2023
238f789
Use set to keep track of checked out sessions
SandeepTuniki Aug 26, 2023
977e3cb
Revert "Use set to keep track of checked out sessions"
SandeepTuniki Aug 26, 2023
9823cca
Replace all_sessions/session_stack with sessions_available/sessions_i…
SandeepTuniki Aug 26, 2023
cdda7ab
Remove all references to all_sessions
SandeepTuniki Aug 26, 2023
6eba750
Merge branch 'ilb-with-write-ratio' into ilb-with-parallel-queries
SandeepTuniki Aug 26, 2023
66e25f4
Simplify BatchUpdateResults class
SandeepTuniki Aug 26, 2023
1d2137a
Merge branch 'ilb-with-write-ratio' into ilb-with-parallel-queries
SandeepTuniki Aug 26, 2023
f24f30f
Add concurrency test for Transaction#read() method
SandeepTuniki Aug 26, 2023
34c1155
Fix a potential bug
SandeepTuniki Aug 27, 2023
b984c37
Modify unit tests for batch_update
SandeepTuniki Aug 27, 2023
7b9419a
Modify unit tests for read
SandeepTuniki Aug 27, 2023
21fae4e
Minor refactorings
SandeepTuniki Aug 27, 2023
48fac2e
Cleanup code
SandeepTuniki Aug 27, 2023
9d6ff7c
Add comments
SandeepTuniki Aug 27, 2023
5650b0d
Maintain consistency in using accessors over instance variables
SandeepTuniki Aug 29, 2023
3e304a9
Simplify thread-safety handling code in Transaction class
SandeepTuniki Aug 29, 2023
44a0f96
Fix race condition for transaction_id() method
SandeepTuniki Aug 29, 2023
a34cddc
Document mutex usage
SandeepTuniki Aug 29, 2023
e36fbf7
Use a constant variable instead of :is_transaction_running symbol
SandeepTuniki Aug 29, 2023
72882dd
Rename next_seqno to indicate thread synchronization
SandeepTuniki Aug 30, 2023
ddbb7e9
Update documentation to include more info about mutexes & synchroniza…
SandeepTuniki Aug 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion .toys/ci.rb
Expand Up @@ -123,7 +123,8 @@ def setup_auth_env
"GCLOUD_TEST_PROJECT" => final_project,
"GOOGLE_CLOUD_PROJECT" => final_project,
"GCLOUD_TEST_KEYFILE" => final_keyfile,
"GOOGLE_APPLICATION_CREDENTIALS" => ENV["GOOGLE_APPLICATION_CREDENTIALS"]
"GOOGLE_APPLICATION_CREDENTIALS" => ENV["GOOGLE_APPLICATION_CREDENTIALS"],
"MT_COMPAT" => "true"
}
end

Expand Down
Expand Up @@ -80,7 +80,7 @@
_(prior_results.rows.count).must_equal 3

timestamp = db[dialect].transaction do |tx|
_(tx.transaction_id).wont_be :nil?
_(tx.no_existing_transaction?).must_equal true

row_counts = tx.batch_update do |b|
b.batch_update insert_dml[dialect], params: insert_params[dialect]
Expand All @@ -107,7 +107,7 @@
_(prior_results.rows.count).must_equal 3

timestamp = db[dialect].transaction do |tx|
_(tx.transaction_id).wont_be :nil?
_(tx.no_existing_transaction?).must_equal true

err = expect do
tx.batch_update { |b| } # rubocop:disable Lint/EmptyBlock
Expand All @@ -124,7 +124,7 @@
_(prior_results.rows.count).must_equal 3

timestamp = db[dialect].transaction do |tx|
_(tx.transaction_id).wont_be :nil?
_(tx.no_existing_transaction?).must_equal true
begin
tx.batch_update do |b|
b.batch_update insert_dml[dialect], params: insert_params[dialect]
Expand All @@ -148,12 +148,31 @@
_(timestamp).must_be_kind_of Time
end

it "raises BatchUpdateError when the first statement in Batch DML is a syntax error for #{dialect}" do
prior_results = db[dialect].execute_sql "SELECT * FROM accounts"
_(prior_results.rows.count).must_equal 3
db[dialect].transaction do |tx|
begin
_(tx.no_existing_transaction?).must_equal true
tx.batch_update do |b|
b.batch_update update_dml_syntax_error, params: update_params[dialect]
end
rescue Google::Cloud::Spanner::BatchUpdateError => e
_(e.cause).must_be_kind_of Google::Cloud::InvalidArgumentError
_(e.cause.message).must_equal "Statement 0: 'UPDDDD accounts' is not valid DML."
end
_(tx.no_existing_transaction?).must_equal true
end
prior_results = db[dialect].execute_sql "SELECT * FROM accounts"
_(prior_results.rows.count).must_equal 3
end

it "runs execute_update and batch_update in the same transaction for #{dialect}" do
prior_results = db[dialect].execute_sql "SELECT * FROM accounts"
_(prior_results.rows.count).must_equal 3

timestamp = db[dialect].transaction do |tx|
_(tx.transaction_id).wont_be :nil?
_(tx.no_existing_transaction?).must_equal true

row_counts = tx.batch_update do |b|
b.batch_update insert_dml[dialect], params: insert_params[dialect]
Expand Down
6 changes: 3 additions & 3 deletions google-cloud-spanner/acceptance/spanner/client/dml_test.rb
Expand Up @@ -71,7 +71,7 @@
_(prior_results.rows.count).must_equal 3

timestamp = db[dialect].transaction do |tx|
_(tx.transaction_id).wont_be :nil?
_(tx.no_existing_transaction?).must_equal true

# Execute a DML using execute_update and make sure data is updated and correct count is returned.
insert_row_count = tx.execute_update \
Expand Down Expand Up @@ -112,7 +112,7 @@
_(prior_results.rows.count).must_equal 3

timestamp = db[dialect].transaction do |tx|
_(tx.transaction_id).wont_be :nil?
_(tx.no_existing_transaction?).must_equal true

# Execute a DML using execute_update and make sure data is updated and correct count is returned.
insert_row_count = tx.execute_update \
Expand Down Expand Up @@ -141,7 +141,7 @@
_(prior_results.rows.count).must_equal 3

timestamp = db[dialect].transaction do |tx|
_(tx.transaction_id).wont_be :nil?
_(tx.no_existing_transaction?).must_equal true

# Execute a DML statement, followed by calling existing insert method,
# commit the transaction and assert that both the updates are present.
Expand Down
Expand Up @@ -35,9 +35,35 @@
db.delete "accounts"
end

it "runs basic inline begin transaction" do
db.transaction do |tx|
_(tx.no_existing_transaction?).must_equal true
tx_results = tx.execute_query "SELECT * from accounts"
_(tx.transaction_id).wont_be :nil?
_(tx_results.rows.count).must_equal default_account_rows.length
end
end

it "re-uses existing transaction for multiple queries" do
db.transaction do |tx|
_(tx.no_existing_transaction?).must_equal true

tx_results = tx.execute_query "SELECT * from accounts"
tx_id_1 = tx.transaction_id
_(tx_id_1).wont_be :nil?
_(tx_results.rows.count).must_equal default_account_rows.length

tx_results_2 = tx.execute_query "SELECT * from accounts WHERE active = true"
tx_id_2 = tx.transaction_id
_(tx_id_2).wont_be :nil?
_(tx_id_1).must_equal tx_id_2
_(tx_results_2.rows.count).must_equal 2
end
end

it "modifies accounts and verifies data with reads" do
timestamp = db.transaction do |tx|
_(tx.transaction_id).wont_be :nil?
_(tx.no_existing_transaction?).must_equal true

tx_results = tx.read "accounts", columns
_(tx_results).must_be_kind_of Google::Cloud::Spanner::Results
Expand Down Expand Up @@ -70,7 +96,7 @@

it "can rollback a transaction without passing on using Rollback" do
timestamp = db.transaction do |tx|
_(tx.transaction_id).wont_be :nil?
_(tx.no_existing_transaction?).must_equal true

tx_results = tx.read "accounts", columns
_(tx_results).must_be_kind_of Google::Cloud::Spanner::Results
Expand Down Expand Up @@ -101,7 +127,7 @@
it "can rollback a transaction and pass on the error" do
assert_raises ZeroDivisionError do
db.transaction do |tx|
_(tx.transaction_id).wont_be :nil?
_(tx.no_existing_transaction?).must_equal true

tx_results = tx.read "accounts", columns
_(tx_results).must_be_kind_of Google::Cloud::Spanner::Results
Expand Down Expand Up @@ -220,7 +246,7 @@

commit_options = { return_commit_stats: true }
commit_resp = db.transaction commit_options: commit_options do |tx|
_(tx.transaction_id).wont_be :nil?
_(tx.no_existing_transaction?).must_equal true

tx.insert "accounts", additional_account
end
Expand Down
@@ -0,0 +1,50 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


module Google
module Cloud
module Spanner
##
# @private Helper class to process BatchDML response
class BatchUpdateResults
## Object of type
# Google::Cloud::Spanner::V1::ExecuteBatchDmlResponse
attr_reader :grpc

def initialize grpc
@grpc = grpc
end

def row_counts
if @grpc.status.code.zero?
@grpc.result_sets.map { |rs| rs.stats.row_count_exact }
else
begin
raise Google::Cloud::Error.from_error @grpc.status
rescue Google::Cloud::Error
raise Google::Cloud::Spanner::BatchUpdateError.from_grpc @grpc
end
end
end

##
# Returns transaction if available. Otherwise returns nil
def transaction
@grpc&.result_sets&.first&.metadata&.transaction
end
end
end
end
end
49 changes: 31 additions & 18 deletions google-cloud-spanner/lib/google/cloud/spanner/client.rb
Expand Up @@ -49,6 +49,10 @@ module Spanner
# end
#
class Client
##
# @private
IS_TRANSACTION_RUNNING_KEY = "ruby_spanner_is_transaction_running".freeze

##
# @private Creates a new Spanner Client instance.
def initialize project, instance_id, database_id, session_labels: nil,
Expand Down Expand Up @@ -1634,6 +1638,7 @@ def commit commit_options: nil, request_options: nil,
# rubocop:disable Metrics/MethodLength
# rubocop:disable Metrics/BlockLength


##
# Creates a transaction for reads and writes that execute atomically at
# a single logical point in time across columns, rows, and tables in a
Expand Down Expand Up @@ -1788,7 +1793,7 @@ def commit commit_options: nil, request_options: nil,
def transaction deadline: 120, commit_options: nil,
request_options: nil, call_options: nil
ensure_service!
unless Thread.current[:transaction_id].nil?
unless Thread.current[IS_TRANSACTION_RUNNING_KEY].nil?
raise "Nested transactions are not allowed"
end

Expand All @@ -1799,17 +1804,20 @@ def transaction deadline: 120, commit_options: nil,
request_options = Convert.to_request_options \
request_options, tag_type: :transaction_tag

@pool.with_transaction do |tx|
@pool.with_session do |session|
tx = session.create_empty_transaction
if request_options
tx.transaction_tag = request_options[:transaction_tag]
end

begin
Thread.current[:transaction_id] = tx.transaction_id
Thread.current[IS_TRANSACTION_RUNNING_KEY] = true
yield tx
transaction_id = nil
transaction_id = tx.transaction_id if tx.existing_transaction?
commit_resp = @project.service.commit \
tx.session.path, tx.mutations,
transaction_id: tx.transaction_id,
transaction_id: transaction_id,
commit_options: commit_options,
request_options: request_options,
call_options: call_options
Expand All @@ -1819,28 +1827,21 @@ def transaction deadline: 120, commit_options: nil,
Google::Cloud::AbortedError,
GRPC::Internal,
Google::Cloud::InternalError => e
raise e if internal_error_and_not_retryable? e
# Re-raise if deadline has passed
if current_time - start_time > deadline
if e.is_a? GRPC::BadStatus
e = Google::Cloud::Error.from_error e
end
raise e
end
check_and_propagate_err! e, (current_time - start_time > deadline)
# Sleep the amount from RetryDelay, or incremental backoff
sleep(delay_from_aborted(e) || backoff *= 1.3)
# Create new transaction on the session and retry the block
tx = tx.session.create_transaction
tx = session.create_transaction
retry
rescue StandardError => e
# Rollback transaction when handling unexpected error
tx.session.rollback tx.transaction_id
tx.session.rollback tx.transaction_id if tx.existing_transaction?
# Return nil if raised with rollback.
return nil if e.is_a? Rollback
# Re-raise error.
raise e
ensure
Thread.current[:transaction_id] = nil
Thread.current[IS_TRANSACTION_RUNNING_KEY] = nil
end
end
end
Expand Down Expand Up @@ -1923,7 +1924,7 @@ def snapshot strong: nil, timestamp: nil, read_timestamp: nil,
exact_staleness: exact_staleness

ensure_service!
unless Thread.current[:transaction_id].nil?
unless Thread.current[IS_TRANSACTION_RUNNING_KEY].nil?
raise "Nested snapshots are not allowed"
end

Expand All @@ -1933,11 +1934,11 @@ def snapshot strong: nil, timestamp: nil, read_timestamp: nil,
timestamp: (timestamp || read_timestamp),
staleness: (staleness || exact_staleness),
call_options: call_options
Thread.current[:transaction_id] = snp_grpc.id
Thread.current[IS_TRANSACTION_RUNNING_KEY] = true
snp = Snapshot.from_grpc snp_grpc, session
yield snp if block_given?
ensure
Thread.current[:transaction_id] = nil
Thread.current[IS_TRANSACTION_RUNNING_KEY] = nil
end
nil
end
Expand Down Expand Up @@ -2272,6 +2273,18 @@ def delay_from_aborted err
nil
end

##
# Determines if a transaction error should be propagated to the user.
# And re-raises the error accordingly
def check_and_propagate_err! err, deadline_passed
raise err if internal_error_and_not_retryable? err
return unless deadline_passed
if err.is_a? GRPC::BadStatus
raise Google::Cloud::Error.from_error err
end
raise err
end

def internal_error_and_not_retryable? error
(error.instance_of?(Google::Cloud::InternalError) ||
error.instance_of?(GRPC::Internal)) &&
Expand Down