Skip to content

Commit

Permalink
Add Adapter#execute method to run PartiQL queries
Browse files Browse the repository at this point in the history
  • Loading branch information
andrykonchin committed Jan 22, 2023
1 parent a0b900e commit e015d51
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 1 deletion.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,21 @@ resolving the fields with a second query against the table since a query
against GSI then a query on base table is still likely faster than scan
on the base table*

### PartiQL

To run PartiQL statements `Dynamoid.adapter.execute` method should be
used:

```ruby
Dynamoid.adapter.execute("UPDATE users SET name = 'Mike' WHERE id = '1'")
```

Parameters are also supported:

```ruby
Dynamoid.adapter.execute("SELECT * FROM users WHERE id = ?", ['1'])
```

## Configuration

Listed below are all configuration options.
Expand Down
2 changes: 1 addition & 1 deletion lib/dynamoid/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def delete_table(table_name, options = {})
end
end

%i[batch_get_item delete_item get_item list_tables put_item truncate batch_write_item batch_delete_item].each do |m|
%i[batch_get_item delete_item get_item list_tables put_item truncate batch_write_item batch_delete_item execute].each do |m|
# Method delegation with benchmark to the underlying adapter. Faster than relying on method_missing.
#
# @since 0.2.0
Expand Down
23 changes: 23 additions & 0 deletions lib/dynamoid/adapter_plugin/aws_sdk_v3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require_relative 'aws_sdk_v3/query'
require_relative 'aws_sdk_v3/scan'
require_relative 'aws_sdk_v3/execute_statement'
require_relative 'aws_sdk_v3/create_table'
require_relative 'aws_sdk_v3/batch_get_item'
require_relative 'aws_sdk_v3/item_updater'
Expand Down Expand Up @@ -560,6 +561,28 @@ def count(table_name)
describe_table(table_name, true).item_count
end

# Run PartiQL query.
#
# Dynamoid.adapter.execute("SELECT * FROM users WHERE id = ?", ["758"])
#
# @param [String] statement PartiQL statement
# @param [Array] parameters a list of bind parameters
# @param [Hash] options
# @option [Boolean] consistent_read
# @return [[] | Array[Hash] | Enumerator::Lazy[Hash]] items when used a SELECT statement and empty Array otherwise
#
def execute(statement, parameters = [], options = {})
items = ExecuteStatement.new(client, statement, parameters, options).call

if items.is_a?(Array)
items
else
items.lazy.flat_map { |items| items }
end
rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException
[]
end

protected

#
Expand Down
62 changes: 62 additions & 0 deletions lib/dynamoid/adapter_plugin/aws_sdk_v3/execute_statement.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# frozen_string_literal: true

module Dynamoid
# @private
module AdapterPlugin
class AwsSdkV3
# Excecute a PartiQL query
#
# Documentation:
# - https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_ExecuteStatement.html
# - https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/DynamoDB/Client.html#execute_statement-instance_method
#
# NOTE: For reads result may be paginated. Only pagination with NextToken
# is implemented. Currently LastEvaluatedKey in response cannot be fed to
# ExecuteStatement to get the next page.
#
# See also:
# - https://repost.aws/questions/QUgNPbBYWiRoOlMsJv-XzrWg/how-to-use-last-evaluated-key-in-execute-statement-request
# - https://stackoverflow.com/questions/71438439/aws-dynamodb-executestatement-pagination
class ExecuteStatement
attr_reader :client, :statement, :parameters, :options

def initialize(client, statement, parameters, options)
@client = client
@statement = statement
@parameters = parameters
@options = options.symbolize_keys.slice(:consistent_read)
end

def call
request = {
statement: @statement,
parameters: @parameters,
consistent_read: @options[:consistent_read],
}

response = client.execute_statement(request)

if !response.next_token
return response_to_items(response)
end

Enumerator.new do |yielder|
yielder.yield(response_to_items(response))

while response.next_token
request[:next_token] = response.next_token
response = client.execute_statement(request)
yielder.yield(response_to_items(response))
end
end
end

private

def response_to_items(response)
response.items.map(&:symbolize_keys)
end
end
end
end
end
3 changes: 3 additions & 0 deletions lib/dynamoid/adapter_plugin/aws_sdk_v3/middleware/limit.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ def call(request)
# lower to obey limits. We can assume the difference won't be
# negative due to break statements below but choose smaller limit
# which is why we have 2 separate if statements.
#
# NOTE: Adjusting based on record_limit can cause many HTTP requests
# being made. We may want to change this behavior, but it affects
# filtering on data with potentially large gaps.
#
# Example:
# User.where('created_at.gte' => 1.day.ago).record_limit(1000)
# Records 1-999 User's that fit criteria
# Records 1000-2000 Users's that do not fit criteria
# Record 2001 fits criteria
#
# The underlying implementation will have 1 page for records 1-999
# then will request with limit 1 for records 1000-2000 (making 1000
# requests of limit 1) until hit record 2001.
Expand Down
90 changes: 90 additions & 0 deletions spec/dynamoid/adapter_plugin/aws_sdk_v3_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,96 @@ def dynamo_request(table_name, scan_hash = {}, select_opts = {})
end
end

describe '#execute' do
it 'executes a PartiQL query' do
Dynamoid.adapter.put_item(test_table1, id: '1', name: 'Josh')

Dynamoid.adapter.execute("UPDATE #{test_table1} SET name = 'Mike' WHERE id = '1'")

item = Dynamoid.adapter.get_item(test_table1, '1')
expect(item[:name]).to eql 'Mike'
end

it 'returns items for SELECT statement' do
Dynamoid.adapter.put_item(test_table1, id: '1', name: 'Josh')

items = Dynamoid.adapter.execute("SELECT * FROM #{test_table1}")
expect(items.size).to eql 1
expect(items).to eql [{id: '1', name: 'Josh'}]
end

it 'returns [] for statements other than SELECT' do
Dynamoid.adapter.put_item(test_table1, id: '1', name: 'Josh')

response = Dynamoid.adapter.execute("UPDATE #{test_table1} SET name = 'Mike' WHERE id = '1'")
expect(response).to eql []

response = Dynamoid.adapter.execute("INSERT INTO #{test_table1} VALUE { 'id': '2' }")
expect(response).to eql []

response = Dynamoid.adapter.execute("DELETE FROM #{test_table1} WHERE id = '1'")
expect(response).to eql []
end

it 'accepts bind parameters as array of values' do
Dynamoid.adapter.put_item(test_table1, id: '1', name: 'Josh')

Dynamoid.adapter.execute("UPDATE #{test_table1} SET name = 'Mike' WHERE id = ?", ['1'])

item = Dynamoid.adapter.get_item(test_table1, '1')
expect(item[:name]).to eql 'Mike'
end

it "returns [] when WHERE condition evaluated to false" do
expect(Dynamoid.adapter.scan_count(test_table1)).to eql 0

response = Dynamoid.adapter.execute("SELECT * FROM #{test_table1} WHERE id = '1'")
expect(response.to_a).to eql []

response = Dynamoid.adapter.execute("UPDATE #{test_table1} SET name = 'Mike' WHERE id = '1'")
expect(response.to_a).to eql []

response = Dynamoid.adapter.execute("DELETE FROM #{test_table1} WHERE id = '1'")
expect(response.to_a).to eql []
end

it 'accepts :consistent_read option' do
expect(Dynamoid.adapter.client).to receive(:execute_statement)
.with(including(consistent_read: true))
.and_call_original

Dynamoid.adapter.execute("SELECT * FROM #{test_table1} WHERE id = '1'", [], consistent_read: true)

expect(Dynamoid.adapter.client).to receive(:execute_statement)
.with(including(consistent_read: false))
.and_call_original

Dynamoid.adapter.execute("SELECT * FROM #{test_table1} WHERE id = '1'", [], consistent_read: false)
end

it 'loads lazily all the pages of a paginated result' do
next_token = double('next-token')
obj1 = { 'attribute1' => 1 }
obj2 = { 'attribute2' => 2 }
obj3 = { 'attribute3' => 3 }
obj4 = { 'attribute4' => 4 }
response1 = double('response-1', next_token: next_token, items: [obj1, obj2])
response2 = double('response-1', next_token: nil, items: [obj3, obj4])

expect(Dynamoid.adapter.client).to receive(:execute_statement)
.and_return(response1, response2)

items = Dynamoid.adapter.execute("PartlySQL statement")
expect(items).to be_a(Enumerator::Lazy)
expect(items.to_a).to eql [
{ attribute1: 1 },
{ attribute2: 2 },
{ attribute3: 3 },
{ attribute4: 4 }
]
end
end

# connection_config
describe '#connectin_config' do
subject { described_class.new.connection_config }
Expand Down

0 comments on commit e015d51

Please sign in to comment.