From a3d9d521ed1c43adcb1a9844c7cf41ddecd90805 Mon Sep 17 00:00:00 2001 From: Andrew Konchin Date: Fri, 23 Mar 2018 23:42:11 +0200 Subject: [PATCH] Use optional backoff in `import` method --- lib/dynamoid/persistence.rb | 22 +++++++-- spec/dynamoid/persistence_spec.rb | 76 +++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+), 5 deletions(-) diff --git a/lib/dynamoid/persistence.rb b/lib/dynamoid/persistence.rb index 41819870..5c569adc 100644 --- a/lib/dynamoid/persistence.rb +++ b/lib/dynamoid/persistence.rb @@ -213,13 +213,25 @@ def dynamo_type(type) end def import(objects) - documents = objects.map { |attrs| - self.build(attrs).tap { |item| + documents = objects.map do |attrs| + self.build(attrs).tap do |item| item.hash_key = SecureRandom.uuid if item.hash_key.blank? - } - } + end + end - Dynamoid.adapter.batch_write_item(self.table_name, documents.map(&:dump)) + unless Dynamoid.config.backoff + Dynamoid.adapter.batch_write_item(self.table_name, documents.map(&:dump)) + else + backoff = nil + Dynamoid.adapter.batch_write_item(self.table_name, documents.map(&:dump)) do |has_unprocessed_items| + if has_unprocessed_items + backoff ||= Dynamoid.config.build_backoff + backoff.call + else + backoff = nil + end + end + end documents.each { |d| d.new_record = false } documents diff --git a/spec/dynamoid/persistence_spec.rb b/spec/dynamoid/persistence_spec.rb index 35cc1765..06f99329 100644 --- a/spec/dynamoid/persistence_spec.rb +++ b/spec/dynamoid/persistence_spec.rb @@ -1028,5 +1028,81 @@ def self.name; 'Address'; end user = User.find(users[0].id) expect(user.todo_list).to eq nil end + + context 'backoff is specified' do + let(:backoff_strategy) do + ->(_) { -> { @counter += 1 } } + end + + before do + @old_backoff = Dynamoid.config.backoff + @old_backoff_strategies = Dynamoid.config.backoff_strategies.dup + + @counter = 0 + Dynamoid.config.backoff_strategies[:simple] = backoff_strategy + Dynamoid.config.backoff = { simple: nil } + end + + after do + Dynamoid.config.backoff = @old_backoff + Dynamoid.config.backoff_strategies = @old_backoff_strategies + end + + it 'creates multiple documents' do + expect { + Address.import([{city: 'Chicago'}, {city: 'New York'}]) + }.to change { Address.count }.by(2) + end + + it 'uses specified backoff when some items are not processed' do + # dynamodb-local ignores provisioned throughput settings + # so we cannot emulate unprocessed items - let's stub + + klass = new_class + table_name = klass.table_name + items = (1 .. 3).map(&:to_s).map { |id| { id: id } } + + responses = [ + double('response 1', unprocessed_items: { table_name => [ + double(put_request: double(item: { id: '3' })) + ]}), + double('response 2', unprocessed_items: { table_name => [ + double(put_request: double(item: { id: '3' })) + ]}), + double('response 3', unprocessed_items: nil) + ] + allow(Dynamoid.adapter.client).to receive(:batch_write_item).and_return(*responses) + + klass.import(items) + expect(@counter).to eq 2 + end + + it 'uses new backoff after successful call without unprocessed items' do + # dynamodb-local ignores provisioned throughput settings + # so we cannot emulate unprocessed items - let's stub + + klass = new_class + table_name = klass.table_name + # batch_write_item processes up to 15 items at once + # so we emulate 4 calls with items + items = (1 .. 50).map(&:to_s).map { |id| { id: id } } + + responses = [ + double('response 1', unprocessed_items: { table_name => [ + double(put_request: double(item: { id: '25' })) + ]}), + double('response 3', unprocessed_items: nil), + double('response 2', unprocessed_items: { table_name => [ + double(put_request: double(item: { id: '25' })) + ]}), + double('response 3', unprocessed_items: nil) + ] + allow(Dynamoid.adapter.client).to receive(:batch_write_item).and_return(*responses) + + expect(backoff_strategy).to receive(:call).exactly(2).times.and_call_original + klass.import(items) + expect(@counter).to eq 2 + end + end end end