-
-
Notifications
You must be signed in to change notification settings - Fork 197
/
aws_sdk_v3.rb
674 lines (604 loc) · 28.8 KB
/
aws_sdk_v3.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
# frozen_string_literal: true
require_relative 'aws_sdk_v3/query'
require_relative 'aws_sdk_v3/scan'
require_relative 'aws_sdk_v3/execute_statement'
require_relative 'aws_sdk_v3/create_table'
require_relative 'aws_sdk_v3/batch_get_item'
require_relative 'aws_sdk_v3/item_updater'
require_relative 'aws_sdk_v3/table'
require_relative 'aws_sdk_v3/until_past_table_status'
module Dynamoid
# @private
module AdapterPlugin
# The AwsSdkV3 adapter provides support for the aws-sdk version 2 for ruby.
# Note: Don't use keyword arguments in public methods as far as method
# calls on adapter are delegated to the plugin.
#
# There are breaking changes in Ruby related to delegating keyword
# arguments so we have decided just to avoid them when use delegation.
#
# https://eregon.me/blog/2019/11/10/the-delegation-challenge-of-ruby27.html
class AwsSdkV3
EQ = 'EQ'
HASH_KEY = 'HASH'
RANGE_KEY = 'RANGE'
STRING_TYPE = 'S'
NUM_TYPE = 'N'
BINARY_TYPE = 'B'
TABLE_STATUSES = {
creating: 'CREATING',
updating: 'UPDATING',
deleting: 'DELETING',
active: 'ACTIVE'
}.freeze
PARSE_TABLE_STATUS = lambda { |resp, lookup = :table|
# lookup is table for describe_table API
# lookup is table_description for create_table API
# because Amazon, damnit.
resp.send(lookup).table_status
}
BATCH_WRITE_ITEM_REQUESTS_LIMIT = 25
CONNECTION_CONFIG_OPTIONS = %i[endpoint region http_continue_timeout http_idle_timeout http_open_timeout http_read_timeout].freeze
# See https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ReservedWords.html
RESERVED_WORDS = Set.new(
%i[
ABORT ABSOLUTE ACTION ADD AFTER AGENT AGGREGATE ALL ALLOCATE ALTER ANALYZE
AND ANY ARCHIVE ARE ARRAY AS ASC ASCII ASENSITIVE ASSERTION ASYMMETRIC AT
ATOMIC ATTACH ATTRIBUTE AUTH AUTHORIZATION AUTHORIZE AUTO AVG BACK BACKUP
BASE BATCH BEFORE BEGIN BETWEEN BIGINT BINARY BIT BLOB BLOCK BOOLEAN BOTH
BREADTH BUCKET BULK BY BYTE CALL CALLED CALLING CAPACITY CASCADE CASCADED
CASE CAST CATALOG CHAR CHARACTER CHECK CLASS CLOB CLOSE CLUSTER CLUSTERED
CLUSTERING CLUSTERS COALESCE COLLATE COLLATION COLLECTION COLUMN COLUMNS
COMBINE COMMENT COMMIT COMPACT COMPILE COMPRESS CONDITION CONFLICT CONNECT
CONNECTION CONSISTENCY CONSISTENT CONSTRAINT CONSTRAINTS CONSTRUCTOR
CONSUMED CONTINUE CONVERT COPY CORRESPONDING COUNT COUNTER CREATE CROSS
CUBE CURRENT CURSOR CYCLE DATA DATABASE DATE DATETIME DAY DEALLOCATE DEC
DECIMAL DECLARE DEFAULT DEFERRABLE DEFERRED DEFINE DEFINED DEFINITION
DELETE DELIMITED DEPTH DEREF DESC DESCRIBE DESCRIPTOR DETACH DETERMINISTIC
DIAGNOSTICS DIRECTORIES DISABLE DISCONNECT DISTINCT DISTRIBUTE DO DOMAIN
DOUBLE DROP DUMP DURATION DYNAMIC EACH ELEMENT ELSE ELSEIF EMPTY ENABLE
END EQUAL EQUALS ERROR ESCAPE ESCAPED EVAL EVALUATE EXCEEDED EXCEPT
EXCEPTION EXCEPTIONS EXCLUSIVE EXEC EXECUTE EXISTS EXIT EXPLAIN EXPLODE
EXPORT EXPRESSION EXTENDED EXTERNAL EXTRACT FAIL FALSE FAMILY FETCH FIELDS
FILE FILTER FILTERING FINAL FINISH FIRST FIXED FLATTERN FLOAT FOR FORCE
FOREIGN FORMAT FORWARD FOUND FREE FROM FULL FUNCTION FUNCTIONS GENERAL
GENERATE GET GLOB GLOBAL GO GOTO GRANT GREATER GROUP GROUPING HANDLER HASH
HAVE HAVING HEAP HIDDEN HOLD HOUR IDENTIFIED IDENTITY IF IGNORE IMMEDIATE
IMPORT IN INCLUDING INCLUSIVE INCREMENT INCREMENTAL INDEX INDEXED INDEXES
INDICATOR INFINITE INITIALLY INLINE INNER INNTER INOUT INPUT INSENSITIVE
INSERT INSTEAD INT INTEGER INTERSECT INTERVAL INTO INVALIDATE IS ISOLATION
ITEM ITEMS ITERATE JOIN KEY KEYS LAG LANGUAGE LARGE LAST LATERAL LEAD
LEADING LEAVE LEFT LENGTH LESS LEVEL LIKE LIMIT LIMITED LINES LIST LOAD
LOCAL LOCALTIME LOCALTIMESTAMP LOCATION LOCATOR LOCK LOCKS LOG LOGED LONG
LOOP LOWER MAP MATCH MATERIALIZED MAX MAXLEN MEMBER MERGE METHOD METRICS
MIN MINUS MINUTE MISSING MOD MODE MODIFIES MODIFY MODULE MONTH MULTI
MULTISET NAME NAMES NATIONAL NATURAL NCHAR NCLOB NEW NEXT NO NONE NOT NULL
NULLIF NUMBER NUMERIC OBJECT OF OFFLINE OFFSET OLD ON ONLINE ONLY OPAQUE
OPEN OPERATOR OPTION OR ORDER ORDINALITY OTHER OTHERS OUT OUTER OUTPUT
OVER OVERLAPS OVERRIDE OWNER PAD PARALLEL PARAMETER PARAMETERS PARTIAL
PARTITION PARTITIONED PARTITIONS PATH PERCENT PERCENTILE PERMISSION
PERMISSIONS PIPE PIPELINED PLAN POOL POSITION PRECISION PREPARE PRESERVE
PRIMARY PRIOR PRIVATE PRIVILEGES PROCEDURE PROCESSED PROJECT PROJECTION
PROPERTY PROVISIONING PUBLIC PUT QUERY QUIT QUORUM RAISE RANDOM RANGE RANK
RAW READ READS REAL REBUILD RECORD RECURSIVE REDUCE REF REFERENCE
REFERENCES REFERENCING REGEXP REGION REINDEX RELATIVE RELEASE REMAINDER
RENAME REPEAT REPLACE REQUEST RESET RESIGNAL RESOURCE RESPONSE RESTORE
RESTRICT RESULT RETURN RETURNING RETURNS REVERSE REVOKE RIGHT ROLE ROLES
ROLLBACK ROLLUP ROUTINE ROW ROWS RULE RULES SAMPLE SATISFIES SAVE SAVEPOINT
SCAN SCHEMA SCOPE SCROLL SEARCH SECOND SECTION SEGMENT SEGMENTS SELECT SELF
SEMI SENSITIVE SEPARATE SEQUENCE SERIALIZABLE SESSION SET SETS SHARD SHARE
SHARED SHORT SHOW SIGNAL SIMILAR SIZE SKEWED SMALLINT SNAPSHOT SOME SOURCE
SPACE SPACES SPARSE SPECIFIC SPECIFICTYPE SPLIT SQL SQLCODE SQLERROR
SQLEXCEPTION SQLSTATE SQLWARNING START STATE STATIC STATUS STORAGE STORE
STORED STREAM STRING STRUCT STYLE SUB SUBMULTISET SUBPARTITION SUBSTRING
SUBTYPE SUM SUPER SYMMETRIC SYNONYM SYSTEM TABLE TABLESAMPLE TEMP TEMPORARY
TERMINATED TEXT THAN THEN THROUGHPUT TIME TIMESTAMP TIMEZONE TINYINT TO
TOKEN TOTAL TOUCH TRAILING TRANSACTION TRANSFORM TRANSLATE TRANSLATION
TREAT TRIGGER TRIM TRUE TRUNCATE TTL TUPLE TYPE UNDER UNDO UNION UNIQUE UNIT
UNKNOWN UNLOGGED UNNEST UNPROCESSED UNSIGNED UNTIL UPDATE UPPER URL USAGE
USE USER USERS USING UUID VACUUM VALUE VALUED VALUES VARCHAR VARIABLE
VARIANCE VARINT VARYING VIEW VIEWS VIRTUAL VOID WAIT WHEN WHENEVER WHERE
WHILE WINDOW WITH WITHIN WITHOUT WORK WRAPPED WRITE YEAR ZONE
]
).freeze
attr_reader :table_cache
# Establish the connection to DynamoDB.
#
# @return [Aws::DynamoDB::Client] the DynamoDB connection
def connect!
@client = Aws::DynamoDB::Client.new(connection_config)
@table_cache = {}
end
def connection_config
@connection_hash = {}
(Dynamoid::Config.settings.compact.keys & CONNECTION_CONFIG_OPTIONS).each do |option|
@connection_hash[option] = Dynamoid::Config.send(option)
end
# if credentials are passed, they already contain access key & secret key
if Dynamoid::Config.credentials?
@connection_hash[:credentials] = Dynamoid::Config.credentials
else
# otherwise, pass access key & secret key for credentials creation
if Dynamoid::Config.access_key?
@connection_hash[:access_key_id] = Dynamoid::Config.access_key
end
if Dynamoid::Config.secret_key?
@connection_hash[:secret_access_key] = Dynamoid::Config.secret_key
end
end
@connection_hash[:logger] = Dynamoid::Config.logger
@connection_hash[:log_level] = :debug
# https://github.com/aws/aws-sdk-ruby/blob/master/gems/aws-sdk-core/lib/aws-sdk-core/plugins/logging.rb
# https://github.com/aws/aws-sdk-ruby/blob/master/gems/aws-sdk-core/lib/aws-sdk-core/log/formatter.rb
if Dynamoid::Config.log_formatter
@connection_hash[:log_formatter] = Dynamoid::Config.log_formatter
end
@connection_hash
end
# Return the client object.
#
# @since 1.0.0
def client
@client
end
# Puts multiple items in one table
#
# If optional block is passed it will be called for each written batch of items, meaning once per batch.
# Block receives boolean flag which is true if there are some unprocessed items, otherwise false.
#
# @example Saves several items to the table testtable
# Dynamoid::AdapterPlugin::AwsSdkV3.batch_write_item('table1', [{ id: '1', name: 'a' }, { id: '2', name: 'b'}])
#
# @example Pass block
# Dynamoid::AdapterPlugin::AwsSdkV3.batch_write_item('table1', items) do |bool|
# if bool
# puts 'there are unprocessed items'
# end
# end
#
# @param [String] table_name the name of the table
# @param [Array] objects to be processed
# @param [Hash] options additional options
# @yield [true|false] invokes an optional block with argument - whether there are unprocessed items
#
# See:
# * http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html
# * http://docs.aws.amazon.com/sdkforruby/api/Aws/DynamoDB/Client.html#batch_write_item-instance_method
def batch_write_item(table_name, objects, options = {})
items = objects.map { |o| sanitize_item(o) }
while items.present?
batch = items.shift(BATCH_WRITE_ITEM_REQUESTS_LIMIT)
requests = batch.map { |item| { put_request: { item: item } } }
response = client.batch_write_item(
{
request_items: {
table_name => requests
},
return_consumed_capacity: 'TOTAL',
return_item_collection_metrics: 'SIZE'
}.merge!(options)
)
yield(response.unprocessed_items.present?) if block_given?
if response.unprocessed_items.present?
items += response.unprocessed_items[table_name].map { |r| r.put_request.item }
end
end
rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException => e
raise Dynamoid::Errors::ConditionalCheckFailedException, e
end
# Get many items at once from DynamoDB. More efficient than getting each item individually.
#
# If optional block is passed `nil` will be returned and the block will be called for each read batch of items,
# meaning once per batch.
#
# Block receives parameters:
# * hash with items like `{ table_name: [items]}`
# * and boolean flag is true if there are some unprocessed keys, otherwise false.
#
# @example Retrieve IDs 1 and 2 from the table testtable
# Dynamoid::AdapterPlugin::AwsSdkV3.batch_get_item('table1' => ['1', '2'])
#
# @example Pass block to receive each batch
# Dynamoid::AdapterPlugin::AwsSdkV3.batch_get_item('table1' => ids) do |hash, bool|
# puts hash['table1']
#
# if bool
# puts 'there are unprocessed keys'
# end
# end
#
# @param [Hash] table_names_with_ids the hash of tables and IDs to retrieve
# @param [Hash] options to be passed to underlying BatchGet call
# @param [Proc] block optional block can be passed to handle each batch of items
#
# @return [Hash] a hash where keys are the table names and the values are the retrieved items
#
# See:
# * http://docs.aws.amazon.com/sdkforruby/api/Aws/DynamoDB/Client.html#batch_get_item-instance_method
#
# @since 1.0.0
#
# @todo: Provide support for passing options to underlying batch_get_item
def batch_get_item(table_names_with_ids, options = {}, &block)
tables_with_ids = table_names_with_ids.transform_keys do |name|
describe_table(name)
end
BatchGetItem.new(client, tables_with_ids, options).call(&block)
end
# Delete many items at once from DynamoDB. More efficient than delete each item individually.
#
# @example Delete IDs 1 and 2 from the table testtable
# Dynamoid::AdapterPlugin::AwsSdk.batch_delete_item('table1' => ['1', '2'])
# or
# Dynamoid::AdapterPlugin::AwsSdkV3.batch_delete_item('table1' => [['hk1', 'rk2'], ['hk1', 'rk2']]]))
#
# @param [Hash] options the hash of tables and IDs to delete
#
# See:
# * http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html
# * http://docs.aws.amazon.com/sdkforruby/api/Aws/DynamoDB/Client.html#batch_write_item-instance_method
#
# TODO handle rejections because of internal processing failures
def batch_delete_item(options)
requests = []
options.each_pair do |table_name, ids|
table = describe_table(table_name)
ids.each_slice(BATCH_WRITE_ITEM_REQUESTS_LIMIT) do |sliced_ids|
delete_requests = sliced_ids.map do |id|
{ delete_request: { key: key_stanza(table, *id) } }
end
requests << { table_name => delete_requests }
end
end
requests.each do |items|
client.batch_write_item(
request_items: items,
return_consumed_capacity: 'TOTAL',
return_item_collection_metrics: 'SIZE'
)
end
rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException => e
raise Dynamoid::Errors::ConditionalCheckFailedException, e
end
# Create a table on DynamoDB. This usually takes a long time to complete.
#
# @param [String] table_name the name of the table to create
# @param [Symbol] key the table's primary key (defaults to :id)
# @param [Hash] options provide a range key here if the table has a composite key
# @option options [Array<Dynamoid::Indexes::Index>] local_secondary_indexes
# @option options [Array<Dynamoid::Indexes::Index>] global_secondary_indexes
# @option options [Symbol] hash_key_type The type of the hash key
# @option options [Boolean] sync Wait for table status to be ACTIVE?
# @since 1.0.0
def create_table(table_name, key = :id, options = {})
Dynamoid.logger.info "Creating #{table_name} table. This could take a while."
CreateTable.new(client, table_name, key, options).call
true
rescue Aws::DynamoDB::Errors::ResourceInUseException => e
Dynamoid.logger.error "Table #{table_name} cannot be created as it already exists"
false
end
def update_time_to_live(table_name, attribute)
request = {
table_name: table_name,
time_to_live_specification: {
attribute_name: attribute,
enabled: true,
}
}
client.update_time_to_live(request)
end
# Create a table on DynamoDB *synchronously*.
# This usually takes a long time to complete.
# CreateTable is normally an asynchronous operation.
# You can optionally define secondary indexes on the new table,
# as part of the CreateTable operation.
# If you want to create multiple tables with secondary indexes on them,
# you must create the tables sequentially.
# Only one table with secondary indexes can be
# in the CREATING state at any given time.
# See: http://docs.aws.amazon.com/sdkforruby/api/Aws/DynamoDB/Client.html#create_table-instance_method
#
# @param [String] table_name the name of the table to create
# @param [Symbol] key the table's primary key (defaults to :id)
# @param [Hash] options provide a range key here if the table has a composite key
# @option options [Array<Dynamoid::Indexes::Index>] local_secondary_indexes
# @option options [Array<Dynamoid::Indexes::Index>] global_secondary_indexes
# @option options [Symbol] hash_key_type The type of the hash key
# @since 1.2.0
def create_table_synchronously(table_name, key = :id, options = {})
create_table(table_name, key, options.merge(sync: true))
end
# Removes an item from DynamoDB.
#
# @param [String] table_name the name of the table
# @param [String] key the hash key of the item to delete
# @param [Hash] options provide a range key here if the table has a composite key
#
# @since 1.0.0
#
# @todo: Provide support for various options http://docs.aws.amazon.com/sdkforruby/api/Aws/DynamoDB/Client.html#delete_item-instance_method
def delete_item(table_name, key, options = {})
options ||= {}
range_key = options[:range_key]
conditions = options[:conditions]
table = describe_table(table_name)
client.delete_item(
table_name: table_name,
key: key_stanza(table, key, range_key),
expected: expected_stanza(conditions)
)
rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException => e
raise Dynamoid::Errors::ConditionalCheckFailedException, e
end
# Deletes an entire table from DynamoDB.
#
# @param [String] table_name the name of the table to destroy
# @option options [Boolean] sync Wait for table status check to raise ResourceNotFoundException
#
# @since 1.0.0
def delete_table(table_name, options = {})
resp = client.delete_table(table_name: table_name)
if options[:sync]
status = PARSE_TABLE_STATUS.call(resp, :table_description)
if status == TABLE_STATUSES[:deleting]
UntilPastTableStatus.new(client, table_name, :deleting).call
end
end
table_cache.delete(table_name)
rescue Aws::DynamoDB::Errors::ResourceInUseException => e
Dynamoid.logger.error "Table #{table_name} cannot be deleted as it is in use"
raise e
end
def delete_table_synchronously(table_name, options = {})
delete_table(table_name, options.merge(sync: true))
end
# @todo Add a DescribeTable method.
# Fetches an item from DynamoDB.
#
# @param [String] table_name the name of the table
# @param [String] key the hash key of the item to find
# @param [Hash] options provide a range key here if the table has a composite key
#
# @return [Hash] a hash representing the raw item in DynamoDB
#
# @since 1.0.0
#
# @todo Provide support for various options http://docs.aws.amazon.com/sdkforruby/api/Aws/DynamoDB/Client.html#get_item-instance_method
def get_item(table_name, key, options = {})
options = options.dup
options ||= {}
table = describe_table(table_name)
range_key = options.delete(:range_key)
consistent_read = options.delete(:consistent_read)
item = client.get_item(table_name: table_name,
key: key_stanza(table, key, range_key),
consistent_read: consistent_read)[:item]
item ? item_to_hash(item) : nil
end
# Edits an existing item's attributes, or adds a new item to the table if it does not already exist. You can put, delete, or add attribute values
#
# @param [String] table_name the name of the table
# @param [String] key the hash key of the item to find
# @param [Hash] options provide a range key here if the table has a composite key
#
# @return new attributes for the record
#
# @todo Provide support for various options http://docs.aws.amazon.com/sdkforruby/api/Aws/DynamoDB/Client.html#update_item-instance_method
def update_item(table_name, key, options = {})
options = options.dup
range_key = options.delete(:range_key)
conditions = options.delete(:conditions)
table = describe_table(table_name)
item_updater = ItemUpdater.new(table, key, range_key)
yield(item_updater)
raise "non-empty options: #{options}" unless options.empty?
result = client.update_item(table_name: table_name,
key: key_stanza(table, key, range_key),
attribute_updates: item_updater.attribute_updates,
expected: expected_stanza(conditions),
return_values: 'ALL_NEW')
item_to_hash(result[:attributes])
rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException => e
raise Dynamoid::Errors::ConditionalCheckFailedException, e
end
# List all tables on DynamoDB.
#
# @since 1.0.0
def list_tables
[].tap do |result|
start_table_name = nil
loop do
result_page = client.list_tables exclusive_start_table_name: start_table_name
start_table_name = result_page.last_evaluated_table_name
result.concat result_page.table_names
break unless start_table_name
end
end
end
# Persists an item on DynamoDB.
#
# @param [String] table_name the name of the table
# @param [Object] object a hash or Dynamoid object to persist
#
# @since 1.0.0
#
# See: http://docs.aws.amazon.com/sdkforruby/api/Aws/DynamoDB/Client.html#put_item-instance_method
def put_item(table_name, object, options = {})
options ||= {}
item = sanitize_item(object)
client.put_item(
{
table_name: table_name,
item: item,
expected: expected_stanza(options)
}.merge!(options)
)
rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException => e
raise Dynamoid::Errors::ConditionalCheckFailedException, e
end
# Query the DynamoDB table. This employs DynamoDB's indexes so is generally faster than scanning, but is
# only really useful for range queries, since it can only find by one hash key at once. Only provide
# one range key to the hash.
#
# Dynamoid.adapter.query('users', { id: [[:eq, '1']], age: [[:between, [10, 30]]] }, { batch_size: 1000 })
#
# @param [String] table_name the name of the table
# @param [Array[Array]] key_conditions conditions for the primary key attributes
# @param [Array[Array]] non_key_conditions (optional) conditions for non-primary key attributes
# @param [Hash] options (optional) the options to query the table with
# @option options [Boolean] :consistent_read You can set the ConsistentRead parameter to true and obtain a strongly consistent result
# @option options [Boolean] :scan_index_forward Specifies the order for index traversal: If true (default), the traversal is performed in ascending order; if false, the traversal is performed in descending order.
# @option options [Symbop] :select The attributes to be returned in the result (one of ALL_ATTRIBUTES, ALL_PROJECTED_ATTRIBUTES, ...)
# @option options [Symbol] :index_name The name of an index to query. This index can be any local secondary index or global secondary index on the table.
# @option options [Hash] :exclusive_start_key The primary key of the first item that this operation will evaluate.
# @option options [Integer] :batch_size The number of items to lazily load one by one
# @option options [Integer] :record_limit The maximum number of items to return (not necessarily the number of evaluated items)
# @option options [Integer] :scan_limit The maximum number of items to evaluate (not necessarily the number of matching items)
# @option options [Array[Symbol]] :project The attributes to retrieve from the table
#
# @return [Enumerable] matching items
#
# @since 1.0.0
#
# @todo Provide support for various other options http://docs.aws.amazon.com/sdkforruby/api/Aws/DynamoDB/Client.html#query-instance_method
def query(table_name, key_conditions, non_key_conditions = {}, options = {})
Enumerator.new do |yielder|
table = describe_table(table_name)
Query.new(client, table, key_conditions, non_key_conditions, options).call.each do |page|
yielder.yield(
page.items.map { |item| item_to_hash(item) },
last_evaluated_key: page.last_evaluated_key
)
end
end
end
def query_count(table_name, key_conditions, non_key_conditions, options)
table = describe_table(table_name)
options[:select] = 'COUNT'
Query.new(client, table, key_conditions, non_key_conditions, options).call
.map(&:count)
.reduce(:+)
end
# Scan the DynamoDB table. This is usually a very slow operation as it naively filters all data on
# the DynamoDB servers.
#
# @param [String] table_name the name of the table
# @param [Hash] conditions a hash of attributes: matching records will be returned by the scan
#
# @return [Enumerable] matching items
#
# @since 1.0.0
#
# @todo: Provide support for various options http://docs.aws.amazon.com/sdkforruby/api/Aws/DynamoDB/Client.html#scan-instance_method
def scan(table_name, conditions = {}, options = {})
Enumerator.new do |yielder|
table = describe_table(table_name)
Scan.new(client, table, conditions, options).call.each do |page|
yielder.yield(
page.items.map { |item| item_to_hash(item) },
last_evaluated_key: page.last_evaluated_key
)
end
end
end
def scan_count(table_name, conditions = {}, options = {})
table = describe_table(table_name)
options[:select] = 'COUNT'
Scan.new(client, table, conditions, options).call
.map(&:count)
.reduce(:+)
end
#
# Truncates all records in the given table
#
# @param [String] table_name the name of the table
#
# @since 1.0.0
def truncate(table_name)
table = describe_table(table_name)
hk = table.hash_key
rk = table.range_key
ids = scan(table_name, {}, {}).flat_map { |i| i }.map do |attributes|
rk ? [attributes[hk], attributes[rk.to_sym]] : attributes[hk]
end
batch_delete_item(table_name => ids)
end
def count(table_name)
describe_table(table_name, true).item_count
end
# Run PartiQL query.
#
# Dynamoid.adapter.execute("SELECT * FROM users WHERE id = ?", ["758"])
#
# @param [String] statement PartiQL statement
# @param [Array] parameters a list of bind parameters
# @param [Hash] options
# @option [Boolean] consistent_read
# @return [[] | Array[Hash] | Enumerator::Lazy[Hash]] items when used a SELECT statement and empty Array otherwise
#
def execute(statement, parameters = [], options = {})
items = ExecuteStatement.new(client, statement, parameters, options).call
if items.is_a?(Array)
items
else
items.lazy.flat_map { |array| array }
end
rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException
[]
end
protected
#
# The key hash passed on get_item, put_item, delete_item, update_item, etc
#
def key_stanza(table, hash_key, range_key = nil)
key = { table.hash_key.to_s => hash_key }
key[table.range_key.to_s] = range_key if range_key
key
end
#
# @param [Hash] conditions Conditions to enforce on operation (e.g. { :if => { :count => 5 }, :unless_exists => ['id']})
# @return an Expected stanza for the given conditions hash
#
def expected_stanza(conditions = nil)
expected = Hash.new { |h, k| h[k] = {} }
return expected unless conditions
conditions.delete(:unless_exists).try(:each) do |col|
expected[col.to_s][:exists] = false
end
conditions.delete(:if_exists).try(:each) do |col, val|
expected[col.to_s][:exists] = true
expected[col.to_s][:value] = val
end
conditions.delete(:if).try(:each) do |col, val|
expected[col.to_s][:value] = val
end
expected
end
#
# New, semi-arbitrary API to get data on the table
#
def describe_table(table_name, reload = false)
(!reload && table_cache[table_name]) || begin
table_cache[table_name] = Table.new(client.describe_table(table_name: table_name).data)
end
end
#
# Converts a hash returned by get_item, scan, etc. into a key-value hash
#
def item_to_hash(hash)
hash.symbolize_keys
end
def sanitize_item(attributes)
config_value = Dynamoid.config.store_attribute_with_nil_value
store_attribute_with_nil_value = config_value.nil? ? false : !!config_value
attributes.reject do |_, v|
((v.is_a?(Set) || v.is_a?(String)) && v.empty?) ||
(!store_attribute_with_nil_value && v.nil?)
end.transform_values do |v|
v.is_a?(Hash) ? v.stringify_keys : v
end
end
end
end
end