Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Spanner): support max_commit_delay for commit_options #85

Merged
merged 7 commits into from Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 31 additions & 0 deletions google-cloud-spanner/acceptance/spanner/client/crud_test.rb
Expand Up @@ -135,6 +135,37 @@
_(results.timestamp).wont_be :nil?
end

it "commits with max commit delay for #{dialect}" do
skip if emulator_enabled?

commit_options = { return_commit_stats: true, max_commit_delay: 120 }
commit_resp = db[dialect].commit commit_options: commit_options do |c|
c.insert "accounts", @default_rows[dialect][0]
end

assert_commit_response commit_resp, commit_options

results = db[dialect].read "accounts", ["account_id"], single_use: { timestamp: commit_resp.timestamp }
_(results.rows.count).must_equal 1
_(results.timestamp).wont_be :nil?

commit_resp = db[dialect].commit commit_options: commit_options do |c|
c.upsert "accounts", @default_rows[dialect][0]
end

assert_commit_response commit_resp, commit_options

commit_resp = db[dialect].commit commit_options: commit_options do |c|
c.delete "accounts", [1]
end

assert_commit_response commit_resp, commit_options

results = db[dialect].read "accounts", ["account_id"], single_use: { timestamp: commit_resp.timestamp }
_(results.rows.count).must_equal 0
_(results.timestamp).wont_be :nil?
end

it "inserts, updates, upserts, reads, and deletes records in a transaction for #{dialect}" do
@setup_timestamp[dialect]
active_count_sql = "SELECT COUNT(*) AS count FROM accounts WHERE active = true"
Expand Down
28 changes: 28 additions & 0 deletions google-cloud-spanner/lib/google/cloud/spanner/client.rb
Expand Up @@ -1022,6 +1022,10 @@ def read table, columns, keys: nil, index: nil, limit: nil,
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
# @param [Hash] request_options Common request options.
#
# * `:priority` (String) The relative priority for requests.
Expand Down Expand Up @@ -1141,6 +1145,10 @@ def upsert table, rows, commit_options: nil, request_options: nil
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
# @param [Hash] request_options Common request options.
#
# * `:priority` (String) The relative priority for requests.
Expand Down Expand Up @@ -1259,6 +1267,10 @@ def insert table, rows, commit_options: nil, request_options: nil
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
# @param [Hash] request_options Common request options.
#
# * `:priority` (String) The relative priority for requests.
Expand Down Expand Up @@ -1378,6 +1390,10 @@ def update table, rows, commit_options: nil, request_options: nil
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
# @param [Hash] request_options Common request options.
#
# * `:priority` (String) The relative priority for requests.
Expand Down Expand Up @@ -1475,6 +1491,10 @@ def replace table, rows, commit_options: nil, request_options: nil
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
# @param [Hash] request_options Common request options.
#
# * `:priority` (String) The relative priority for requests.
Expand Down Expand Up @@ -1579,6 +1599,10 @@ def delete table, keys = [], commit_options: nil, request_options: nil,
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
# @param [Hash] request_options Common request options.
#
# * `:priority` (String) The relative priority for requests.
Expand Down Expand Up @@ -1706,6 +1730,10 @@ def commit commit_options: nil, request_options: nil,
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
# @param [Hash] request_options Common request options.
#
# * `:priority` (String) The relative priority for requests.
Expand Down
3 changes: 2 additions & 1 deletion google-cloud-spanner/lib/google/cloud/spanner/convert.rb
Expand Up @@ -242,8 +242,9 @@ def row_to_object row_types, row
Hash[row_to_pairs(row_types, row)]
end

def number_to_duration number
def number_to_duration number, millisecond: false
return nil if number.nil?
number = number/1000.to_f if millisecond

Google::Protobuf::Duration.new \
seconds: number.to_i,
Expand Down
19 changes: 16 additions & 3 deletions google-cloud-spanner/lib/google/cloud/spanner/service.rb
Expand Up @@ -453,13 +453,26 @@ def commit session_name, mutations = [], transaction_id: nil,
request_options: request_options
}

if commit_options
request[:return_commit_stats] = commit_options[:return_commit_stats]
end
request = add_commit_options request, commit_options

service.commit request, opts
end

def add_commit_options request, commit_options
if commit_options
if commit_options.key? :return_commit_stats
request[:return_commit_stats] =
commit_options[:return_commit_stats]
end
if commit_options.key? :max_commit_delay
request[:max_commit_delay] =
Convert.number_to_duration(commit_options[:max_commit_delay],
millisecond: true)
end
end
request
end

def rollback session_name, transaction_id, call_options: nil
route_to_leader = LARHeaders.rollback
opts = default_options session_name: session_name,
Expand Down
31 changes: 25 additions & 6 deletions google-cloud-spanner/lib/google/cloud/spanner/session.rb
Expand Up @@ -569,8 +569,11 @@ def partition_read table, columns, transaction, keys: nil,
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
#
# transaction. Default it is `false`.
# @param [Hash] request_options Common request options.
#
# * `:request_tag` (String) A per-request tag which can be applied
Expand Down Expand Up @@ -686,7 +689,10 @@ def commit transaction_id: nil, commit_options: nil,
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`
#
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
# @param [Hash] request_options Common request options.
#
# * `:request_tag` (String) A per-request tag which can be applied
Expand Down Expand Up @@ -794,7 +800,10 @@ def upsert table, *rows, transaction_id: nil, commit_options: nil,
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`
#
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
# @param [Hash] request_options Common request options.
#
# * `:request_tag` (String) A per-request tag which can be applied
Expand Down Expand Up @@ -901,7 +910,10 @@ def insert table, *rows, transaction_id: nil, commit_options: nil,
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`
#
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
# @param [Hash] request_options Common request options.
#
# * `:request_tag` (String) A per-request tag which can be applied
Expand Down Expand Up @@ -1009,7 +1021,11 @@ def update table, *rows, transaction_id: nil, commit_options: nil,
#
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`.
# {CommitResponse}. Default value is `false`
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
#
# @param [Hash] request_options Common request options.
#
Expand Down Expand Up @@ -1099,7 +1115,10 @@ def replace table, *rows, transaction_id: nil, commit_options: nil,
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`
#
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
# @param [Hash] request_options Common request options.
#
# * `:request_tag` (String) A per-request tag which can be applied
Expand Down
Expand Up @@ -439,7 +439,7 @@
)]
mock = Minitest::Mock.new
mock.expect :create_session, session_grpc, [{database: database_path(instance_id, database_id), session: nil}, default_options]
mock.expect :commit, commit_stats_resp_grpc, [{ session: session_grpc.name, mutations: mutations, transaction_id: nil, single_use_transaction: tx_opts, return_commit_stats: true, request_options: nil }, default_options]
mock.expect :commit, commit_stats_resp_grpc, [{ session: session_grpc.name, mutations: mutations, transaction_id: nil, single_use_transaction: tx_opts, return_commit_stats: true, request_options: nil}, default_options]
spanner.service.mocked_service = mock

commit_resp = client.commit commit_options: commit_options do |c|
Expand Down Expand Up @@ -563,6 +563,30 @@
shutdown_client! client
mock.verify
end

it "commits with max_commit_delay" do
mutations = [Google::Cloud::Spanner::V1::Mutation.new(
update: Google::Cloud::Spanner::V1::Mutation::Write.new(
table: "users", columns: %w(id name active),
values: [Google::Cloud::Spanner::Convert.object_to_grpc_value([1, "Charlie", false]).list_value]
)
)]
commit_options[:max_commit_delay] = 120
commit_delay_duration = Google::Cloud::Spanner::Convert.number_to_duration(120, millisecond: true)
mock = Minitest::Mock.new
mock.expect :create_session, session_grpc, [{database: database_path(instance_id, database_id), session: nil}, default_options]
mock.expect :commit, commit_stats_resp_grpc, [{ session: session_grpc.name, mutations: mutations, transaction_id: nil, single_use_transaction: tx_opts, return_commit_stats: true, max_commit_delay: commit_delay_duration, request_options: nil}, default_options]
spanner.service.mocked_service = mock

commit_resp = client.commit commit_options: commit_options do |c|
c.update "users", [{ id: 1, name: "Charlie", active: false }]
end

assert_commit_response commit_resp, commit_stats_resp_grpc

shutdown_client! client
mock.verify
end
end

describe "priority request options" do
Expand Down
Expand Up @@ -29,6 +29,14 @@
_(duration.nanos).must_equal 0
end

it "converts from millisecond" do
number = 100
duration = Google::Cloud::Spanner::Convert.number_to_duration number, millisecond: true
_(duration).must_be_kind_of Google::Protobuf::Duration
_(duration.seconds).must_equal 0
_(duration.nanos).must_equal 100_000_000
end

it "converts a negative Integer" do
number = -42
duration = Google::Cloud::Spanner::Convert.number_to_duration number
Expand Down