Skip to content

Commit

Permalink
Merge pull request #10 from Sage/feature_idempotency
Browse files Browse the repository at this point in the history
Feature idempotency
  • Loading branch information
vaughanbrittonsage authored Sep 7, 2017
2 parents 1e35e62 + b054c31 commit db0d452
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 17 deletions.
6 changes: 6 additions & 0 deletions .rake_tasks~
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
build
clean
clobber
install
install:local
release[remote]
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,17 @@ This definition can then be used to interact with DynamoDb in relation to the ta

## #create
This method is called create the table definition within a dynamodb account.
> This method should operate in an idempotent manner.
**Params**

- **store** [DynamoDbFramework::Store] [Required] This is used to specify the Dynamodb instance/account to connect to.
- **read_capacity** [Integer] [Optional] [Default=25] This is used to specify the read capacity to provision for this table.
- **write_capacity** [Integer] [Optional] [Default=25] This is used to specify the write capacity to provision for this table.
- **indexes** [Array] [Optional] This is used to specify an array of Index definitions to be created with the table.


ExampleTable.create(read_capacity: 50, write_capacity: 35)
ExampleTable.create(read_capacity: 50, write_capacity: 35, indexes: [ExampleIndex])

## #update
This method is called to update the provisioned capacity for the table.
Expand All @@ -90,6 +92,7 @@ This method is called to update the provisioned capacity for the table.

## #drop
This method is called to drop the table from a dynamodb account.
> This method should operate in an idempotent manner.
**Params**

Expand Down Expand Up @@ -240,6 +243,7 @@ To define a global secondary index in dynamodb create an index definition class

## #create
This method is called create the index definition within a dynamodb account.
> This method should operate in an idempotent manner.
**Params**

Expand All @@ -264,6 +268,7 @@ This method is called to update the provisioned capacity for the index.

## #drop
This method is called to drop the current index.
> This method should operate in an idempotent manner.
**Params**

Expand Down
2 changes: 1 addition & 1 deletion dynamodb_framework.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ Gem::Specification.new do |spec|
spec.add_development_dependency 'pry'
spec.add_dependency 'hash_kit', '~> 0.5'
spec.add_dependency 'json'
spec.add_dependency 'aws-sdk-core'
spec.add_dependency 'aws-sdk-core', '~> 2.10'
end
4 changes: 4 additions & 0 deletions lib/dynamodb_framework/dynamodb_attributes_builder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,9 @@ def add(*options)

end

def contains(name:)
@attributes.detect { |a| a[:name] == name } != nil
end

end
end
27 changes: 25 additions & 2 deletions lib/dynamodb_framework/dynamodb_index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,19 @@ def range_key(field, type)
self.instance_variable_set(:@range_key, { field: field, type: type })
end

def create(store: DynamoDbFramework.default_store, read_capacity: 25, write_capacity: 25)
def create(store: DynamoDbFramework.default_store, read_capacity: 25, write_capacity: 25, submit: true)
unless self.instance_variable_defined?(:@table)
raise DynamoDbFramework::Index::InvalidConfigException.new('Table must be specified.')
end
table = self.instance_variable_get(:@table)
table_name = table.config[:table_name]

#make method idempotent
if exists?(store: store)
wait_until_active(store: store)
return
end

unless self.instance_variable_defined?(:@partition_key)
raise DynamoDbFramework::Index::InvalidConfigException.new('Partition key must be specified.')
end
Expand Down Expand Up @@ -80,7 +86,11 @@ def create(store: DynamoDbFramework.default_store, read_capacity: 25, write_capa

index =table_manager.create_global_index(full_index_name, partition_key[:field], range_key_field, read_capacity, write_capacity)

table_manager.add_index(table_name, builder.attributes, index)
if submit
table_manager.add_index(table_name, builder.attributes, index)
end

index
end

def update(store: DynamoDbFramework.default_store, read_capacity:, write_capacity:)
Expand All @@ -99,6 +109,10 @@ def drop(store: DynamoDbFramework.default_store)
table = self.instance_variable_get(:@table)
table_name = table.config[:table_name]

unless exists?(store: store)
return
end

DynamoDbFramework::TableManager.new(store).drop_index(table_name, full_index_name)
end

Expand All @@ -111,6 +125,15 @@ def exists?(store: DynamoDbFramework.default_store)
DynamoDbFramework::TableManager.new(store).has_index?(table_name, full_index_name)
end

def wait_until_active(store: DynamoDbFramework.default_store)
unless self.instance_variable_defined?(:@table)
raise DynamoDbFramework::Index::InvalidConfigException.new('Table must be specified.')
end
table = self.instance_variable_get(:@table)
table_name = table.config[:table_name]
DynamoDbFramework::TableManager.new(store).wait_until_index_active(table_name, full_index_name)
end

def query(partition:)
DynamoDbFramework::Query.new(index_name: config[:index_name], table_name: config[:table].config[:table_name], partition_key: config[:partition_key][:field], partition_value: partition)
end
Expand Down
45 changes: 43 additions & 2 deletions lib/dynamodb_framework/dynamodb_table.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,14 @@ def range_key(field, type)
self.instance_variable_set(:@range_key, { field: field, type: type })
end

def create(store: DynamoDbFramework.default_store, read_capacity: 25, write_capacity: 25)
def create(store: DynamoDbFramework.default_store, read_capacity: 25, write_capacity: 25, indexes: [])

#make method idempotent
if exists?(store: store)
wait_until_active(store: store)
return
end

unless self.instance_variable_defined?(:@partition_key)
raise DynamoDbFramework::Table::InvalidConfigException.new('Partition key must be specified.')
end
Expand All @@ -57,21 +64,55 @@ def create(store: DynamoDbFramework.default_store, read_capacity: 25, write_capa
builder.add({ name: range_key[:field], type: range_key[:type], key: :range })
end

DynamoDbFramework::TableManager.new(store).create_table({ name: full_table_name, attributes: builder.attributes, read_capacity: read_capacity, write_capacity: write_capacity })
global_indexes = nil
if indexes != nil && indexes.length > 0
global_indexes = []
indexes.each do |i|
global_indexes << i.create(store: store, submit: false)
index_partition_key = i.instance_variable_get(:@partition_key)
unless builder.contains(name: index_partition_key[:field])
builder.add({ name: index_partition_key[:field], type: index_partition_key[:type] })
end
if i.instance_variable_defined?(:@range_key)
index_range_key = i.instance_variable_get(:@range_key)
unless builder.contains(name: index_range_key[:field])
builder.add({ name: index_range_key[:field], type: index_range_key[:type] })
end
end

end
end

DynamoDbFramework::TableManager.new(store).create_table({ name: full_table_name, attributes: builder.attributes, read_capacity: read_capacity, write_capacity: write_capacity, global_indexes: global_indexes })
end

def update(store: DynamoDbFramework.default_store, read_capacity:, write_capacity:)
DynamoDbFramework::TableManager.new(store).update_throughput(full_table_name, read_capacity, write_capacity)
end

def drop(store: DynamoDbFramework.default_store)
unless exists?(store: store)
return
end
DynamoDbFramework::TableManager.new(store).drop(full_table_name)
end

def exists?(store: DynamoDbFramework.default_store)
DynamoDbFramework::TableManager.new(store).exists?(full_table_name)
end

def wait_until_active(store: DynamoDbFramework.default_store)
DynamoDbFramework::TableManager.new(store).wait_until_active(full_table_name)
end

def get_status(store: DynamoDbFramework.default_store)
DynamoDbFramework::TableManager.new(store).get_status(full_table_name)
end

def active?(store: DynamoDbFramework.default_store)
DynamoDbFramework::TableManager.new(store).get_status(full_table_name) == 'ACTIVE'
end

def query(partition:)
DynamoDbFramework::Query.new(table_name: config[:table_name], partition_key: config[:partition_key][:field], partition_value: partition)
end
Expand Down
39 changes: 33 additions & 6 deletions lib/dynamodb_framework/dynamodb_table_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ def update_index_throughput(table_name, index_name, read_capacity, write_capacit
end

def drop_index(table_name, index_name)
unless has_index?(table_name, index_name)
return
end

table = {
:table_name => table_name,
:global_secondary_index_updates => [
Expand All @@ -130,6 +134,7 @@ def drop_index(table_name, index_name)
# wait for table to be updated
DynamoDbFramework.logger.info "[#{self.class}] -Deleting global index: #{index_name}."
wait_until_index_dropped(table_name, index_name)

DynamoDbFramework.logger.info "[#{self.class}] -Index: [#{index_name}] dropped."
end

Expand All @@ -154,9 +159,13 @@ def get_index_status(table_name, index_name)
return nil
end

def wait_timeout
Time.now + 900 #15 minutes
end

def wait_until_active(table_name)

end_time = Time.now + 300
end_time = wait_timeout
while Time.now < end_time do

status = get_status(table_name)
Expand All @@ -168,13 +177,31 @@ def wait_until_active(table_name)
sleep(5)
end

raise "Timeout occured while waiting for table: #{table_name}, to become active."
raise "Timeout occurred while waiting for table: #{table_name}, to become active."

end

def wait_until_dropped(table_name)

end_time = wait_timeout
while Time.now < end_time do

status = get_status(table_name)

if status == nil
return
end

sleep(5)
end

raise "Timeout occurred while waiting for table: #{table_name}, to be dropped."

end

def wait_until_index_active(table_name, index_name)

end_time = Time.now + 300
end_time = wait_timeout
while Time.now < end_time do

status = get_index_status(table_name, index_name)
Expand All @@ -186,13 +213,13 @@ def wait_until_index_active(table_name, index_name)
sleep(5)
end

raise "Timeout occured while waiting for table: #{table_name}, index: #{index_name}, to become active."
raise "Timeout occurred while waiting for table: #{table_name}, index: #{index_name}, to become active."

end

def wait_until_index_dropped(table_name, index_name)

end_time = Time.now + 300
end_time = wait_timeout
while Time.now < end_time do

status = get_index_status(table_name, index_name)
Expand All @@ -204,7 +231,7 @@ def wait_until_index_dropped(table_name, index_name)
sleep(5)
end

raise "Timeout occured while waiting for table: #{table_name}, index: #{index_name}, to be dropped."
raise "Timeout occurred while waiting for table: #{table_name}, index: #{index_name}, to be dropped."

end

Expand Down
2 changes: 1 addition & 1 deletion lib/dynamodb_framework/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module DynamoDbFramework
VERSION = '1.5.2'
VERSION = '1.6.0'
end
52 changes: 52 additions & 0 deletions spec/dynamodb_table_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,22 @@
end
end

context 'with an index specified' do
let(:table_name) { ExampleTable.config[:table_name] }
let(:index_name) { ExampleIndex.config[:index_name] }
before do
table_manager.drop(table_name)
table_manager.drop_index(table_name, index_name)
end
it 'should create the table and index' do
expect(table_manager.exists?(table_name)).to be false
expect(ExampleIndex.exists?(store: store)).to be false
ExampleTable.create(store: store, indexes: [ExampleIndex])
expect(table_manager.exists?(table_name)).to be true
expect(ExampleIndex.exists?(store: store)).to be true
end
end

context 'without a range key' do
let(:table_name) { ExampleTableWithoutRangeKey.config[:table_name] }
before do
Expand All @@ -33,6 +49,19 @@
expect(table_manager.exists?(table_name)).to be true
end
end

context 'when already exists' do
let(:table_name) { ExampleTableWithoutRangeKey.config[:table_name] }
before do
table_manager.drop(table_name)
ExampleTableWithoutRangeKey.create(store: store)
end
it 'should return without error' do
expect(table_manager.exists?(table_name)).to be true
expect { ExampleTableWithoutRangeKey.create(store: store) }.not_to raise_error
expect(table_manager.exists?(table_name)).to be true
end
end
end
context 'when an invalid table class calls the create method' do
context 'without a table_name specified' do
Expand Down Expand Up @@ -111,6 +140,29 @@
end
end

describe '#wait_until_active' do
context 'when a table exists' do
let(:table_name) { ExampleTable.config[:table_name] }
before do
table_manager.drop(table_name)
ExampleTable.create(store: store)
end
it 'should not raise timeout error' do
expect { ExampleTable.wait_until_active(store: store) }.not_to raise_error
end
end
context 'when a table does NOT exist' do
let(:table_name) { ExampleTable.config[:table_name] }
before do
table_manager.drop(table_name)
allow_any_instance_of(DynamoDbFramework::TableManager).to receive(:wait_timeout).and_return(Time.now)
end
it 'should raise timeout error' do
expect { ExampleTable.wait_until_active(store: store) }.to raise_error
end
end
end

describe '#query' do

let(:repository) do
Expand Down
4 changes: 2 additions & 2 deletions spec/example_index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ class ExampleIndexWithoutPartitionKey
extend DynamoDbFramework::Index

table ExampleTable
index_name 'example_index'
index_name 'example_index.without.partition'
range_key :id, :S

end

class ExampleIndexWithoutRangeKey
extend DynamoDbFramework::Index

index_name 'example_index'
index_name 'example_index.without.range'
table ExampleTable
partition_key :name, :S

Expand Down
Loading

0 comments on commit db0d452

Please sign in to comment.