Skip to content

Commit

Permalink
feat(Spanner): support max_commit_delay for commit_options (#85)
Browse files Browse the repository at this point in the history
  • Loading branch information
NivedhaSenthil committed Feb 7, 2024
1 parent 2ada04c commit 3b73dfc
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 11 deletions.
31 changes: 31 additions & 0 deletions google-cloud-spanner/acceptance/spanner/client/crud_test.rb
Expand Up @@ -135,6 +135,37 @@
_(results.timestamp).wont_be :nil?
end

it "commits with max commit delay for #{dialect}" do
skip if emulator_enabled?

commit_options = { return_commit_stats: true, max_commit_delay: 120 }
commit_resp = db[dialect].commit commit_options: commit_options do |c|
c.insert "accounts", @default_rows[dialect][0]
end

assert_commit_response commit_resp, commit_options

results = db[dialect].read "accounts", ["account_id"], single_use: { timestamp: commit_resp.timestamp }
_(results.rows.count).must_equal 1
_(results.timestamp).wont_be :nil?

commit_resp = db[dialect].commit commit_options: commit_options do |c|
c.upsert "accounts", @default_rows[dialect][0]
end

assert_commit_response commit_resp, commit_options

commit_resp = db[dialect].commit commit_options: commit_options do |c|
c.delete "accounts", [1]
end

assert_commit_response commit_resp, commit_options

results = db[dialect].read "accounts", ["account_id"], single_use: { timestamp: commit_resp.timestamp }
_(results.rows.count).must_equal 0
_(results.timestamp).wont_be :nil?
end

it "inserts, updates, upserts, reads, and deletes records in a transaction for #{dialect}" do
@setup_timestamp[dialect]
active_count_sql = "SELECT COUNT(*) AS count FROM accounts WHERE active = true"
Expand Down
28 changes: 28 additions & 0 deletions google-cloud-spanner/lib/google/cloud/spanner/client.rb
Expand Up @@ -1022,6 +1022,10 @@ def read table, columns, keys: nil, index: nil, limit: nil,
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
# @param [Hash] request_options Common request options.
#
# * `:priority` (String) The relative priority for requests.
Expand Down Expand Up @@ -1141,6 +1145,10 @@ def upsert table, rows, commit_options: nil, request_options: nil
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
# @param [Hash] request_options Common request options.
#
# * `:priority` (String) The relative priority for requests.
Expand Down Expand Up @@ -1259,6 +1267,10 @@ def insert table, rows, commit_options: nil, request_options: nil
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
# @param [Hash] request_options Common request options.
#
# * `:priority` (String) The relative priority for requests.
Expand Down Expand Up @@ -1378,6 +1390,10 @@ def update table, rows, commit_options: nil, request_options: nil
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
# @param [Hash] request_options Common request options.
#
# * `:priority` (String) The relative priority for requests.
Expand Down Expand Up @@ -1475,6 +1491,10 @@ def replace table, rows, commit_options: nil, request_options: nil
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
# @param [Hash] request_options Common request options.
#
# * `:priority` (String) The relative priority for requests.
Expand Down Expand Up @@ -1579,6 +1599,10 @@ def delete table, keys = [], commit_options: nil, request_options: nil,
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
# @param [Hash] request_options Common request options.
#
# * `:priority` (String) The relative priority for requests.
Expand Down Expand Up @@ -1706,6 +1730,10 @@ def commit commit_options: nil, request_options: nil,
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
# @param [Hash] request_options Common request options.
#
# * `:priority` (String) The relative priority for requests.
Expand Down
3 changes: 2 additions & 1 deletion google-cloud-spanner/lib/google/cloud/spanner/convert.rb
Expand Up @@ -242,8 +242,9 @@ def row_to_object row_types, row
Hash[row_to_pairs(row_types, row)]
end

def number_to_duration number
def number_to_duration number, millisecond: false
return nil if number.nil?
number = number/1000.to_f if millisecond

Google::Protobuf::Duration.new \
seconds: number.to_i,
Expand Down
19 changes: 16 additions & 3 deletions google-cloud-spanner/lib/google/cloud/spanner/service.rb
Expand Up @@ -453,13 +453,26 @@ def commit session_name, mutations = [], transaction_id: nil,
request_options: request_options
}

if commit_options
request[:return_commit_stats] = commit_options[:return_commit_stats]
end
request = add_commit_options request, commit_options

service.commit request, opts
end

def add_commit_options request, commit_options
if commit_options
if commit_options.key? :return_commit_stats
request[:return_commit_stats] =
commit_options[:return_commit_stats]
end
if commit_options.key? :max_commit_delay
request[:max_commit_delay] =
Convert.number_to_duration(commit_options[:max_commit_delay],
millisecond: true)
end
end
request
end

def rollback session_name, transaction_id, call_options: nil
route_to_leader = LARHeaders.rollback
opts = default_options session_name: session_name,
Expand Down
31 changes: 25 additions & 6 deletions google-cloud-spanner/lib/google/cloud/spanner/session.rb
Expand Up @@ -569,8 +569,11 @@ def partition_read table, columns, transaction, keys: nil,
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
#
# transaction. Default it is `false`.
# @param [Hash] request_options Common request options.
#
# * `:request_tag` (String) A per-request tag which can be applied
Expand Down Expand Up @@ -686,7 +689,10 @@ def commit transaction_id: nil, commit_options: nil,
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`
#
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
# @param [Hash] request_options Common request options.
#
# * `:request_tag` (String) A per-request tag which can be applied
Expand Down Expand Up @@ -794,7 +800,10 @@ def upsert table, *rows, transaction_id: nil, commit_options: nil,
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`
#
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
# @param [Hash] request_options Common request options.
#
# * `:request_tag` (String) A per-request tag which can be applied
Expand Down Expand Up @@ -901,7 +910,10 @@ def insert table, *rows, transaction_id: nil, commit_options: nil,
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`
#
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
# @param [Hash] request_options Common request options.
#
# * `:request_tag` (String) A per-request tag which can be applied
Expand Down Expand Up @@ -1009,7 +1021,11 @@ def update table, *rows, transaction_id: nil, commit_options: nil,
#
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`.
# {CommitResponse}. Default value is `false`
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
#
# @param [Hash] request_options Common request options.
#
Expand Down Expand Up @@ -1099,7 +1115,10 @@ def replace table, *rows, transaction_id: nil, commit_options: nil,
# * `:return_commit_stats` (Boolean) A boolean value. If `true`,
# then statistics related to the transaction will be included in
# {CommitResponse}. Default value is `false`
#
# * `:maxCommitDelay` (Numeric) The amount of latency in millisecond in this request
# is willing to incur in order to improve throughput.
# The commit delay must be at least 0ms and at most 500ms.
# Default value is nil.
# @param [Hash] request_options Common request options.
#
# * `:request_tag` (String) A per-request tag which can be applied
Expand Down
Expand Up @@ -439,7 +439,7 @@
)]
mock = Minitest::Mock.new
mock.expect :create_session, session_grpc, [{database: database_path(instance_id, database_id), session: nil}, default_options]
mock.expect :commit, commit_stats_resp_grpc, [{ session: session_grpc.name, mutations: mutations, transaction_id: nil, single_use_transaction: tx_opts, return_commit_stats: true, request_options: nil }, default_options]
mock.expect :commit, commit_stats_resp_grpc, [{ session: session_grpc.name, mutations: mutations, transaction_id: nil, single_use_transaction: tx_opts, return_commit_stats: true, request_options: nil}, default_options]
spanner.service.mocked_service = mock

commit_resp = client.commit commit_options: commit_options do |c|
Expand Down Expand Up @@ -563,6 +563,30 @@
shutdown_client! client
mock.verify
end

it "commits with max_commit_delay" do
mutations = [Google::Cloud::Spanner::V1::Mutation.new(
update: Google::Cloud::Spanner::V1::Mutation::Write.new(
table: "users", columns: %w(id name active),
values: [Google::Cloud::Spanner::Convert.object_to_grpc_value([1, "Charlie", false]).list_value]
)
)]
commit_options[:max_commit_delay] = 120
commit_delay_duration = Google::Cloud::Spanner::Convert.number_to_duration(120, millisecond: true)
mock = Minitest::Mock.new
mock.expect :create_session, session_grpc, [{database: database_path(instance_id, database_id), session: nil}, default_options]
mock.expect :commit, commit_stats_resp_grpc, [{ session: session_grpc.name, mutations: mutations, transaction_id: nil, single_use_transaction: tx_opts, return_commit_stats: true, max_commit_delay: commit_delay_duration, request_options: nil}, default_options]
spanner.service.mocked_service = mock

commit_resp = client.commit commit_options: commit_options do |c|
c.update "users", [{ id: 1, name: "Charlie", active: false }]
end

assert_commit_response commit_resp, commit_stats_resp_grpc

shutdown_client! client
mock.verify
end
end

describe "priority request options" do
Expand Down
Expand Up @@ -29,6 +29,14 @@
_(duration.nanos).must_equal 0
end

it "converts from millisecond" do
number = 100
duration = Google::Cloud::Spanner::Convert.number_to_duration number, millisecond: true
_(duration).must_be_kind_of Google::Protobuf::Duration
_(duration.seconds).must_equal 0
_(duration.nanos).must_equal 100_000_000
end

it "converts a negative Integer" do
number = -42
duration = Google::Cloud::Spanner::Convert.number_to_duration number
Expand Down

0 comments on commit 3b73dfc

Please sign in to comment.