Skip to content

Commit

Permalink
Merge 3f1353a into 8423724
Browse files Browse the repository at this point in the history
  • Loading branch information
btree1970 committed Aug 24, 2020
2 parents 8423724 + 3f1353a commit 8a5b3b3
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 2 deletions.
21 changes: 21 additions & 0 deletions lib/dynamoid/adapter_plugin/aws_sdk_v3.rb
Expand Up @@ -180,6 +180,27 @@ def batch_write_item(table_name, objects, options = {})
end
end

def transact_write_items(table_name, models, _options = {})
out_put = process(table_name, models)
begin
response = client.transact_write_items({

transact_items: out_put,
return_consumed_capacity: 'TOTAL',
return_item_collection_metrics: 'SIZE'
})
rescue StandardError => e
puts e
end
end

def process(table_name, objects)
objects.map do |m|
key, attrib = m.flatten
{ key => { item: sanitize_item(attrib), table_name: table_name } }
end
end

# Get many items at once from DynamoDB. More efficient than getting each item individually.
#
# If optional block is passed `nil` will be returned and the block will be called for each read batch of items,
Expand Down
13 changes: 13 additions & 0 deletions lib/dynamoid/persistence.rb
Expand Up @@ -9,6 +9,7 @@
require 'dynamoid/persistence/upsert'
require 'dynamoid/persistence/save'
require 'dynamoid/persistence/update_validations'
require 'dynamoid/persistence/transact'

# encoding: utf-8
module Dynamoid
Expand Down Expand Up @@ -159,6 +160,18 @@ def import(array_of_attributes)
Import.call(self, array_of_attributes)
end

# perfom multiple atomic operations synchronously
#
# similar to +import+ transact is a low-level method and won't have
# mechanisms like callback and validation
#
# users = User.transact({condition_check: {}, put: {}, delete: {}, update: {}})
# @param array_of_attributes [Array<Hash>]
# @return [Array] Created models
def transact(list_of_operations)
Transact.call(self, list_of_operations)
end

# Create a model.
#
# Initializes a new model and immediately saves it to DynamoDB.
Expand Down
57 changes: 57 additions & 0 deletions lib/dynamoid/persistence/transact.rb
@@ -0,0 +1,57 @@
# frozen_string_literal: true

require 'securerandom'
module Dynamoid
module Persistence
# @private
class Transact
def self.call(model_class, set_of_operations)
new(model_class, set_of_operations).call
end

def initialize(model_class, set_of_operations)
@model_class = model_class
@set_of_operations = set_of_operations
end

def call
models = @set_of_operations.map(&method(:build_model))
transact(models)
end

def build_model(attributes)
# attrs = attributes.symbolize_keys

type, attrib = attributes.flatten

attrs = attrib.symbolize_keys

operations = {}

if @model_class.timestamps_enabled?
time_now = DateTime.now.in_time_zone(Time.zone)
attrs[:created_at] ||= time_now
attrs[:updated_at] ||= time_now
end

@model_class.build(attrs).tap do |model|
model.hash_key = SecureRandom.uuid if model.hash_key.blank?
operations[type] = model
end

operations
end

def transact(models)
Dynamoid.adapter.transact_write_items(@model_class.table_name, array_of_dumped_attributes(models))
end

def array_of_dumped_attributes(models)
models.map do |m|
key, model = m.flatten
{ key => Dumping.dump_attributes(model.attributes, @model_class.attributes) }
end
end
end
end
end
18 changes: 16 additions & 2 deletions spec/dynamoid/persistence_spec.rb
Expand Up @@ -1306,7 +1306,7 @@ def log_message
obj = document_class.create(title: 'New Document')

expect {
document_class.update_fields(obj.id, { title: 'New title', publisher: 'New publisher' } )
document_class.update_fields(obj.id, { title: 'New title', publisher: 'New publisher' })
}.to raise_error Dynamoid::Errors::UnknownAttribute
end
end
Expand Down Expand Up @@ -1491,7 +1491,7 @@ def log_message
obj = document_class.create(title: 'New Document')

expect {
document_class.upsert(obj.id, { title: 'New title', publisher: 'New publisher' } )
document_class.upsert(obj.id, { title: 'New title', publisher: 'New publisher' })
}.to raise_error Dynamoid::Errors::UnknownAttribute
end
end
Expand Down Expand Up @@ -2594,6 +2594,20 @@ def log_message
end
end

describe '.transact' do
before do
Address.create_table
end

it 'process transaction' do
expect do
Address.transact([{
put: { city: 'Chicago' }
}, { put: { city: 'New York' } }])
end.to change { Address.count }.by(2)
end
end

describe '.import' do
before do
Address.create_table
Expand Down

0 comments on commit 8a5b3b3

Please sign in to comment.