Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to run PartiQL queries #630

Merged
merged 4 commits into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,21 @@ resolving the fields with a second query against the table since a query
against GSI then a query on base table is still likely faster than scan
on the base table*

### PartiQL

To run PartiQL statements `Dynamoid.adapter.execute` method should be
used:

```ruby
Dynamoid.adapter.execute("UPDATE users SET name = 'Mike' WHERE id = '1'")
```

Parameters are also supported:

```ruby
Dynamoid.adapter.execute('SELECT * FROM users WHERE id = ?', ['1'])
```

## Configuration

Listed below are all configuration options.
Expand Down
2 changes: 2 additions & 0 deletions bin/console
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

require 'bundler/setup'
require 'dynamoid'
require 'dynamoid/log/formatter'

# You can add fixtures and/or initialization code here to make experimenting
# with your gem easier. You can also use a different console, if you like.
Expand All @@ -16,6 +17,7 @@ Dynamoid.configure do |config|
config.secret_key = 'REPLACE_WITH_SECRET_ACCESS_KEY'
config.region = 'us-west-2'
config.endpoint = 'http://localhost:8000'
config.log_formatter = Dynamoid::Log::Formatter::Compact.new
end

require 'irb'
Expand Down
2 changes: 1 addition & 1 deletion lib/dynamoid/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def delete_table(table_name, options = {})
end
end

%i[batch_get_item delete_item get_item list_tables put_item truncate batch_write_item batch_delete_item].each do |m|
%i[batch_get_item delete_item get_item list_tables put_item truncate batch_write_item batch_delete_item execute].each do |m|
# Method delegation with benchmark to the underlying adapter. Faster than relying on method_missing.
#
# @since 0.2.0
Expand Down
37 changes: 29 additions & 8 deletions lib/dynamoid/adapter_plugin/aws_sdk_v3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require_relative 'aws_sdk_v3/query'
require_relative 'aws_sdk_v3/scan'
require_relative 'aws_sdk_v3/execute_statement'
require_relative 'aws_sdk_v3/create_table'
require_relative 'aws_sdk_v3/batch_get_item'
require_relative 'aws_sdk_v3/item_updater'
Expand Down Expand Up @@ -393,7 +394,7 @@ def get_item(table_name, key, options = {})
item = client.get_item(table_name: table_name,
key: key_stanza(table, key, range_key),
consistent_read: consistent_read)[:item]
item ? result_item_to_hash(item) : nil
item ? item_to_hash(item) : nil
end

# Edits an existing item's attributes, or adds a new item to the table if it does not already exist. You can put, delete, or add attribute values
Expand Down Expand Up @@ -422,7 +423,7 @@ def update_item(table_name, key, options = {})
attribute_updates: item_updater.attribute_updates,
expected: expected_stanza(conditions),
return_values: 'ALL_NEW')
result_item_to_hash(result[:attributes])
item_to_hash(result[:attributes])
rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException => e
raise Dynamoid::Errors::ConditionalCheckFailedException, e
end
Expand Down Expand Up @@ -489,7 +490,7 @@ def query(table_name, options = {})

Query.new(client, table, options).call.each do |page|
yielder.yield(
page.items.map { |row| result_item_to_hash(row) },
page.items.map { |item| item_to_hash(item) },
last_evaluated_key: page.last_evaluated_key
)
end
Expand Down Expand Up @@ -522,7 +523,7 @@ def scan(table_name, conditions = {}, options = {})

Scan.new(client, table, conditions, options).call.each do |page|
yielder.yield(
page.items.map { |row| result_item_to_hash(row) },
page.items.map { |item| item_to_hash(item) },
last_evaluated_key: page.last_evaluated_key
)
end
Expand Down Expand Up @@ -560,6 +561,28 @@ def count(table_name)
describe_table(table_name, true).item_count
end

# Run PartiQL query.
#
# Dynamoid.adapter.execute("SELECT * FROM users WHERE id = ?", ["758"])
#
# @param [String] statement PartiQL statement
# @param [Array] parameters a list of bind parameters
# @param [Hash] options
# @option [Boolean] consistent_read
# @return [[] | Array[Hash] | Enumerator::Lazy[Hash]] items when used a SELECT statement and empty Array otherwise
#
def execute(statement, parameters = [], options = {})
items = ExecuteStatement.new(client, statement, parameters, options).call

if items.is_a?(Array)
items
else
items.lazy.flat_map { |array| array }
end
rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException
[]
end

protected

#
Expand Down Expand Up @@ -605,10 +628,8 @@ def describe_table(table_name, reload = false)
#
# Converts a hash returned by get_item, scan, etc. into a key-value hash
#
def result_item_to_hash(item)
{}.tap do |r|
item.each { |k, v| r[k.to_sym] = v }
end
def item_to_hash(hash)
hash.symbolize_keys
end

def sanitize_item(attributes)
Expand Down
62 changes: 62 additions & 0 deletions lib/dynamoid/adapter_plugin/aws_sdk_v3/execute_statement.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# frozen_string_literal: true

module Dynamoid
# @private
module AdapterPlugin
class AwsSdkV3
# Excecute a PartiQL query
#
# Documentation:
# - https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_ExecuteStatement.html
# - https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/DynamoDB/Client.html#execute_statement-instance_method
#
# NOTE: For reads result may be paginated. Only pagination with NextToken
# is implemented. Currently LastEvaluatedKey in response cannot be fed to
# ExecuteStatement to get the next page.
#
# See also:
# - https://repost.aws/questions/QUgNPbBYWiRoOlMsJv-XzrWg/how-to-use-last-evaluated-key-in-execute-statement-request
# - https://stackoverflow.com/questions/71438439/aws-dynamodb-executestatement-pagination
class ExecuteStatement
attr_reader :client, :statement, :parameters, :options

def initialize(client, statement, parameters, options)
@client = client
@statement = statement
@parameters = parameters
@options = options.symbolize_keys.slice(:consistent_read)
end

def call
request = {
statement: @statement,
parameters: @parameters,
consistent_read: @options[:consistent_read],
}

response = client.execute_statement(request)

unless response.next_token
return response_to_items(response)
end

Enumerator.new do |yielder|
yielder.yield(response_to_items(response))

while response.next_token
request[:next_token] = response.next_token
response = client.execute_statement(request)
yielder.yield(response_to_items(response))
end
end
end

private

def response_to_items(response)
response.items.map(&:symbolize_keys)
end
end
end
end
end
3 changes: 3 additions & 0 deletions lib/dynamoid/adapter_plugin/aws_sdk_v3/middleware/limit.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ def call(request)
# lower to obey limits. We can assume the difference won't be
# negative due to break statements below but choose smaller limit
# which is why we have 2 separate if statements.
#
# NOTE: Adjusting based on record_limit can cause many HTTP requests
# being made. We may want to change this behavior, but it affects
# filtering on data with potentially large gaps.
#
# Example:
# User.where('created_at.gte' => 1.day.ago).record_limit(1000)
# Records 1-999 User's that fit criteria
# Records 1000-2000 Users's that do not fit criteria
# Record 2001 fits criteria
#
# The underlying implementation will have 1 page for records 1-999
# then will request with limit 1 for records 1000-2000 (making 1000
# requests of limit 1) until hit record 2001.
Expand Down
20 changes: 17 additions & 3 deletions lib/dynamoid/log/formatter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

module Dynamoid
module Log
# https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/Log/Formatter.html
# https://docs.aws.amazon.com/sdk-for-ruby/v2/api/Seahorse/Client/Response.html
# https://aws.amazon.com/ru/blogs/developer/logging-requests/
module Formatter
# https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/Log/Formatter.html
# https://docs.aws.amazon.com/sdk-for-ruby/v2/api/Seahorse/Client/Response.html
# https://aws.amazon.com/ru/blogs/developer/logging-requests/
class Debug
def format(response)
bold = "\x1b[1m"
Expand All @@ -22,6 +22,20 @@ def format(response)
].join("\n")
end
end

class Compact
def format(response)
bold = "\x1b[1m"
reset = "\x1b[0m"

[
response.context.operation.name,
bold,
response.context.http_request.body.string,
reset
].join(' ')
end
end
end
end
end
90 changes: 90 additions & 0 deletions spec/dynamoid/adapter_plugin/aws_sdk_v3_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,96 @@ def dynamo_request(table_name, scan_hash = {}, select_opts = {})
end
end

describe '#execute' do
it 'executes a PartiQL query' do
Dynamoid.adapter.put_item(test_table1, id: '1', name: 'Josh')

Dynamoid.adapter.execute("UPDATE #{test_table1} SET name = 'Mike' WHERE id = '1'")

item = Dynamoid.adapter.get_item(test_table1, '1')
expect(item[:name]).to eql 'Mike'
end

it 'returns items for SELECT statement' do
Dynamoid.adapter.put_item(test_table1, id: '1', name: 'Josh')

items = Dynamoid.adapter.execute("SELECT * FROM #{test_table1}")
expect(items.size).to eql 1
expect(items).to eql [{ id: '1', name: 'Josh' }]
end

it 'returns [] for statements other than SELECT' do
Dynamoid.adapter.put_item(test_table1, id: '1', name: 'Josh')

response = Dynamoid.adapter.execute("UPDATE #{test_table1} SET name = 'Mike' WHERE id = '1'")
expect(response).to eql []

response = Dynamoid.adapter.execute("INSERT INTO #{test_table1} VALUE { 'id': '2' }")
expect(response).to eql []

response = Dynamoid.adapter.execute("DELETE FROM #{test_table1} WHERE id = '1'")
expect(response).to eql []
end

it 'accepts bind parameters as array of values' do
Dynamoid.adapter.put_item(test_table1, id: '1', name: 'Josh')

Dynamoid.adapter.execute("UPDATE #{test_table1} SET name = 'Mike' WHERE id = ?", ['1'])

item = Dynamoid.adapter.get_item(test_table1, '1')
expect(item[:name]).to eql 'Mike'
end

it 'returns [] when WHERE condition evaluated to false' do
expect(Dynamoid.adapter.scan_count(test_table1)).to eql 0

response = Dynamoid.adapter.execute("SELECT * FROM #{test_table1} WHERE id = '1'")
expect(response.to_a).to eql []

response = Dynamoid.adapter.execute("UPDATE #{test_table1} SET name = 'Mike' WHERE id = '1'")
expect(response.to_a).to eql []

response = Dynamoid.adapter.execute("DELETE FROM #{test_table1} WHERE id = '1'")
expect(response.to_a).to eql []
end

it 'accepts :consistent_read option' do
expect(Dynamoid.adapter.client).to receive(:execute_statement)
.with(including(consistent_read: true))
.and_call_original

Dynamoid.adapter.execute("SELECT * FROM #{test_table1} WHERE id = '1'", [], consistent_read: true)

expect(Dynamoid.adapter.client).to receive(:execute_statement)
.with(including(consistent_read: false))
.and_call_original

Dynamoid.adapter.execute("SELECT * FROM #{test_table1} WHERE id = '1'", [], consistent_read: false)
end

it 'loads lazily all the pages of a paginated result' do
next_token = double('next-token')
obj1 = { 'attribute1' => 1 }
obj2 = { 'attribute2' => 2 }
obj3 = { 'attribute3' => 3 }
obj4 = { 'attribute4' => 4 }
response1 = double('response-1', next_token: next_token, items: [obj1, obj2])
response2 = double('response-1', next_token: nil, items: [obj3, obj4])

expect(Dynamoid.adapter.client).to receive(:execute_statement)
.and_return(response1, response2)

items = Dynamoid.adapter.execute('PartlySQL statement')
expect(items).to be_a(Enumerator::Lazy)
expect(items.to_a).to eql [
{ attribute1: 1 },
{ attribute2: 2 },
{ attribute3: 3 },
{ attribute4: 4 }
]
end
end

# connection_config
describe '#connectin_config' do
subject { described_class.new.connection_config }
Expand Down