Skip to content

Commit

Permalink
Merge pull request #29 from sumocoder/delete_optimistic_locking
Browse files Browse the repository at this point in the history
Added support for optimistic locking on delete
  • Loading branch information
philipmw committed Dec 27, 2015
2 parents ad81a32 + 183b96f commit dde056f
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 50 deletions.
90 changes: 49 additions & 41 deletions lib/dynamoid/adapter_plugin/aws_sdk_v2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def batch_get_item(table_ids, options = {})
results = client.batch_get_item(
request_items: request_items
)

ret = Hash.new([].freeze) # Default for tables where no rows are returned
results.data[:responses].each do |table, rows|
ret[table] = rows.collect { |r| result_item_to_hash(r) }
Expand Down Expand Up @@ -107,25 +107,25 @@ def create_table(table_name, key = :id, options = {})
read_capacity = options[:read_capacity] || Dynamoid::Config.read_capacity
write_capacity = options[:write_capacity] || Dynamoid::Config.write_capacity
range_key = options[:range_key]

key_schema = [
{ attribute_name: key.to_s, key_type: HASH_KEY }
]
key_schema << {
key_schema << {
attribute_name: range_key.keys.first.to_s, key_type: RANGE_KEY
} if(range_key)

#TODO: Provide support for number and binary hash key
attribute_definitions = [
{ attribute_name: key.to_s, attribute_type: 'S' }
]
attribute_definitions << {
attribute_name: range_key.keys.first.to_s, attribute_type: api_type(range_key.values.first)
} if(range_key)

client.create_table(table_name: table_name,
provisioned_throughput: {
read_capacity_units: read_capacity,
read_capacity_units: read_capacity,
write_capacity_units: write_capacity
},
key_schema: key_schema,
Expand All @@ -144,9 +144,17 @@ def create_table(table_name, key = :id, options = {})
# @since 1.0.0
#
# @todo: Provide support for various options http://docs.aws.amazon.com/sdkforruby/api/Aws/DynamoDB/Client.html#delete_item-instance_method
def delete_item(table_name, key, options = nil)
def delete_item(table_name, key, options = {})
range_key = options[:range_key]
conditions = options[:conditions]
table = describe_table(table_name)
client.delete_item(table_name: table_name, key: key_stanza(table, key, options && options[:range_key]))
client.delete_item(
table_name: table_name,
key: key_stanza(table, key, range_key),
expected: expected_stanza(conditions)
)
rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException => e
raise Dynamoid::Errors::ConditionalCheckFailedException, e
end

# Deletes an entire table from DynamoDB.
Expand Down Expand Up @@ -231,7 +239,7 @@ def list_tables
# @todo: Provide support for various options http://docs.aws.amazon.com/sdkforruby/api/Aws/DynamoDB/Client.html#put_item-instance_method
def put_item(table_name, object, options = nil)
item = {}

object.each do |k, v|
next if v.nil? || (v.respond_to?(:empty?) && v.empty?)
item[k.to_s] = v
Expand Down Expand Up @@ -342,10 +350,10 @@ def query(table_name, opts = {})
def scan(table_name, scan_hash, select_opts = {})
limit = select_opts.delete(:limit)
batch = select_opts.delete(:batch_size)

request = { table_name: table_name }
request[:limit] = batch || limit if batch || limit
request[:scan_filter] = scan_hash.reduce({}) do |memo, kvp|
request[:scan_filter] = scan_hash.reduce({}) do |memo, kvp|
memo[kvp[0].to_s] = {
attribute_value_list: [kvp[1]],
# TODO: Provide support for all comparison operators
Expand All @@ -369,7 +377,7 @@ def scan(table_name, scan_hash, select_opts = {})
end
end
end


#
# Truncates all records in the given table
Expand All @@ -381,7 +389,7 @@ def truncate(table_name)
table = describe_table(table_name)
hk = table.hash_key
rk = table.range_key

scan(table_name, {}, {}).each do |attributes|
opts = {range_key: attributes[rk.to_sym] } if rk
delete_item(table_name, attributes[hk], opts)
Expand All @@ -393,7 +401,7 @@ def count(table_name)
end

protected

STRING_TYPE = "S".freeze
NUM_TYPE = "N".freeze
BOOLEAN_TYPE = "B".freeze
Expand All @@ -418,28 +426,28 @@ def key_stanza(table, hash_key, range_key = nil)
key[table.range_key.to_s] = range_key if range_key
key
end

#
# @param [Hash] conditions Conditions to enforce on operation (e.g. { :if => { :count => 5 }, :unless_exists => ['id']})
# @return an Expected stanza for the given conditions hash
#
def expected_stanza(conditions = nil)
expected = Hash.new { |h,k| h[k] = {} }
return expected unless conditions

conditions[:unless_exists].try(:each) do |col|
expected[col.to_s][:exists] = false
end
conditions[:if].try(:each) do |col,val|
expected[col.to_s][:value] = val
end

expected
end

HASH_KEY = "HASH".freeze
RANGE_KEY = "RANGE".freeze

#
# New, semi-arbitrary API to get data on the table
#
Expand All @@ -448,7 +456,7 @@ def describe_table(table_name, reload = false)
table_cache[table_name] = Table.new(client.describe_table(table_name: table_name).data)
end
end

#
# Converts a hash returned by get_item, scan, etc. into a key-value hash
#
Expand All @@ -457,35 +465,35 @@ def result_item_to_hash(item)
item.each { |k,v| r[k.to_sym] = v }
end
end

#
# Represents a table. Exposes data from the "DescribeTable" API call, and also
# provides methods for coercing values to the proper types based on the table's schema data
#
class Table
attr_reader :schema

#
# @param [Hash] schema Data returns from a "DescribeTable" call
#
def initialize(schema)
@schema = schema[:table]
end

def range_key
@range_key ||= schema[:key_schema].find { |d| d[:key_type] == RANGE_KEY }.try(:attribute_name)
end

def range_type
range_type ||= schema[:attribute_definitions].find { |d|
range_type ||= schema[:attribute_definitions].find { |d|
d[:attribute_name] == range_key
}.try(:fetch,:attribute_type, nil)
end

def hash_key
@hash_key ||= schema[:key_schema].find { |d| d[:key_type] == HASH_KEY }.try(:attribute_name).to_sym
end

#
# Returns the API type (e.g. "N", "S") for the given column, if the schema defines it,
# nil otherwise
Expand All @@ -500,31 +508,31 @@ def item_count
schema[:item_count]
end
end

#
# Mimics behavior of the yielded object on DynamoDB's update_item API (high level).
# Mimics behavior of the yielded object on DynamoDB's update_item API (high level).
#
class ItemUpdater
attr_reader :table, :key, :range_key

def initialize(table, key, range_key = nil)
@table = table; @key = key, @range_key = range_key
@additions = {}
@deletions = {}
@updates = {}
end

#
# Adds the given values to the values already stored in the corresponding columns.
# The column must contain a Set or a number.
# Adds the given values to the values already stored in the corresponding columns.
# The column must contain a Set or a number.
#
# @param [Hash] vals keys of the hash are the columns to update, vals are the values to
# @param [Hash] vals keys of the hash are the columns to update, vals are the values to
# add. values must be a Set, Array, or Numeric
#
def add(values)
@additions.merge!(values)
end

#
# Removes values from the sets of the given columns
#
Expand All @@ -538,19 +546,19 @@ def delete(values)
#
# Replaces the values of one or more attributes
#
def set(values)
def set(values)
@updates.merge!(values)
end

#
# Returns an AttributeUpdates hash suitable for passing to the V2 Client API
#
def to_h
ret = {}

@additions.each do |k,v|
ret[k.to_s] = {
action: ADD,
ret[k.to_s] = {
action: ADD,
value: v
}
end
Expand All @@ -569,7 +577,7 @@ def to_h

ret
end

ADD = "ADD".freeze
DELETE = "DELETE".freeze
PUT = "PUT".freeze
Expand Down
22 changes: 18 additions & 4 deletions lib/dynamoid/persistence.rb
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ def save(options = {})
end

#
# update!() will increment the lock_version if the table has the column, but will not check it. Thus, a concurrent save will
# never cause an update! to fail, but an update! may cause a concurrent save to fail.
# update!() will increment the lock_version if the table has the column, but will not check it. Thus, a concurrent save will
# never cause an update! to fail, but an update! may cause a concurrent save to fail.
#
#
def update!(conditions = {}, &block)
Expand Down Expand Up @@ -216,7 +216,21 @@ def destroy
# @since 0.2.0
def delete
options = range_key ? {:range_key => dump_field(self.read_attribute(range_key), self.class.attributes[range_key])} : {}

# Add an optimistic locking check if the lock_version column exists
if(self.class.attributes[:lock_version])
conditions = {:if => {}}
conditions[:if][:lock_version] =
if changes[:lock_version].nil?
self.lock_version
else
changes[:lock_version][0]
end
options[:conditions] = conditions
end
Dynamoid.adapter.delete(self.class.table_name, self.hash_key, options)
rescue Dynamoid::Errors::ConditionalCheckFailedException
raise Dynamoid::Errors::StaleObjectError.new(self, 'delete')
end

# Dump this object's attributes into hash form, fit to be persisted into the datastore.
Expand Down Expand Up @@ -268,14 +282,14 @@ def dump_field(value, options)
end
end
end

# Persist the object into the datastore. Assign it an id first if it doesn't have one.
#
# @since 0.2.0
def persist(conditions = nil)
run_callbacks(:save) do
self.hash_key = SecureRandom.uuid if self.hash_key.nil? || self.hash_key.blank?

# Add an exists check to prevent overwriting existing records with new ones
if(new_record?)
conditions ||= {}
Expand Down
36 changes: 31 additions & 5 deletions spec/dynamoid/persistence_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@

expect(Dynamoid.adapter.read("dynamoid_tests_addresses", address.id)[:id]).to eq address.id
end

it 'prevents concurrent writes to tables with a lock_version' do
address.save!
a1 = address
a2 = Address.find(address.id)

a1.city = 'Seattle'
a2.city = 'San Francisco'

a1.save!
expect { a2.save! }.to raise_exception(Dynamoid::Errors::StaleObjectError)
end

it 'assigns itself an id on save only if it does not have one' do
address.id = 'test123'
address.save
Expand Down Expand Up @@ -271,12 +271,12 @@
end
end.to raise_error(Dynamoid::Errors::StaleObjectError)
end

it 'prevents concurrent saves to tables with a lock_version' do
address.save!
a2 = Address.find(address.id)
a2.update! { |a| a.set(:city => "Chicago") }

expect do
address.city = "Seattle"
address.save!
Expand All @@ -292,6 +292,32 @@
msg.destroy
end.to_not raise_error
end

context 'with lock version' do
it 'deletes a record if lock version matches' do
address.save!
expect { address.destroy }.to_not raise_error
end

it 'does not delete a record if lock version does not match' do
address.save!
a1 = address
a2 = Address.find(address.id)

a1.city = 'Seattle'
a1.save!

expect { a2.destroy }.to raise_exception(Dynamoid::Errors::StaleObjectError)
end

it 'uses the correct lock_version even if it is modified' do
address.save!
a1 = address
a1.lock_version = 100

expect { a1.destroy }.to_not raise_error
end
end
end

context 'single table inheritance' do
Expand Down

0 comments on commit dde056f

Please sign in to comment.