diff --git a/lib/dynamoid/adapter_plugin/aws_sdk_v3.rb b/lib/dynamoid/adapter_plugin/aws_sdk_v3.rb index 87d3e4c7..beaeaa61 100644 --- a/lib/dynamoid/adapter_plugin/aws_sdk_v3.rb +++ b/lib/dynamoid/adapter_plugin/aws_sdk_v3.rb @@ -180,6 +180,27 @@ def batch_write_item(table_name, objects, options = {}) end end + def transact_write_items(table_name, models, _options = {}) + out_put = process(table_name, models) + begin + response = client.transact_write_items({ + + transact_items: out_put, + return_consumed_capacity: 'TOTAL', + return_item_collection_metrics: 'SIZE' + }) + rescue StandardError => e + puts e + end + end + + def process(table_name, objects) + objects.map do |m| + key, attrib = m.flatten + { key => { item: sanitize_item(attrib), table_name: table_name } } + end + end + # Get many items at once from DynamoDB. More efficient than getting each item individually. # # If optional block is passed `nil` will be returned and the block will be called for each read batch of items, diff --git a/lib/dynamoid/persistence.rb b/lib/dynamoid/persistence.rb index 329a8f03..716fc645 100644 --- a/lib/dynamoid/persistence.rb +++ b/lib/dynamoid/persistence.rb @@ -9,6 +9,7 @@ require 'dynamoid/persistence/upsert' require 'dynamoid/persistence/save' require 'dynamoid/persistence/update_validations' +require 'dynamoid/persistence/transact' # encoding: utf-8 module Dynamoid @@ -159,6 +160,18 @@ def import(array_of_attributes) Import.call(self, array_of_attributes) end + # perfom multiple atomic operations synchronously + # + # similar to +import+ transact is a low-level method and won't have + # mechanisms like callback and validation + # + # users = User.transact({condition_check: {}, put: {}, delete: {}, update: {}}) + # @param array_of_attributes [Array] + # @return [Array] Created models + def transact(list_of_operations) + Transact.call(self, list_of_operations) + end + # Create a model. # # Initializes a new model and immediately saves it to DynamoDB. diff --git a/lib/dynamoid/persistence/transact.rb b/lib/dynamoid/persistence/transact.rb new file mode 100644 index 00000000..a6fef1a2 --- /dev/null +++ b/lib/dynamoid/persistence/transact.rb @@ -0,0 +1,57 @@ +# frozen_string_literal: true + +require 'securerandom' +module Dynamoid + module Persistence + # @private + class Transact + def self.call(model_class, set_of_operations) + new(model_class, set_of_operations).call + end + + def initialize(model_class, set_of_operations) + @model_class = model_class + @set_of_operations = set_of_operations + end + + def call + models = @set_of_operations.map(&method(:build_model)) + transact(models) + end + + def build_model(attributes) + # attrs = attributes.symbolize_keys + + type, attrib = attributes.flatten + + attrs = attrib.symbolize_keys + + operations = {} + + if @model_class.timestamps_enabled? + time_now = DateTime.now.in_time_zone(Time.zone) + attrs[:created_at] ||= time_now + attrs[:updated_at] ||= time_now + end + + @model_class.build(attrs).tap do |model| + model.hash_key = SecureRandom.uuid if model.hash_key.blank? + operations[type] = model + end + + operations + end + + def transact(models) + Dynamoid.adapter.transact_write_items(@model_class.table_name, array_of_dumped_attributes(models)) + end + + def array_of_dumped_attributes(models) + models.map do |m| + key, model = m.flatten + { key => Dumping.dump_attributes(model.attributes, @model_class.attributes) } + end + end + end + end +end diff --git a/spec/dynamoid/persistence_spec.rb b/spec/dynamoid/persistence_spec.rb index 345f6dbd..c1ab90ed 100644 --- a/spec/dynamoid/persistence_spec.rb +++ b/spec/dynamoid/persistence_spec.rb @@ -1306,7 +1306,7 @@ def log_message obj = document_class.create(title: 'New Document') expect { - document_class.update_fields(obj.id, { title: 'New title', publisher: 'New publisher' } ) + document_class.update_fields(obj.id, { title: 'New title', publisher: 'New publisher' }) }.to raise_error Dynamoid::Errors::UnknownAttribute end end @@ -1491,7 +1491,7 @@ def log_message obj = document_class.create(title: 'New Document') expect { - document_class.upsert(obj.id, { title: 'New title', publisher: 'New publisher' } ) + document_class.upsert(obj.id, { title: 'New title', publisher: 'New publisher' }) }.to raise_error Dynamoid::Errors::UnknownAttribute end end @@ -2594,6 +2594,20 @@ def log_message end end + describe '.transact' do + before do + Address.create_table + end + + it 'process transaction' do + expect do + Address.transact([{ + put: { city: 'Chicago' } + }, { put: { city: 'New York' } }]) + end.to change { Address.count }.by(2) + end + end + describe '.import' do before do Address.create_table