Skip to content
Permalink
Browse files

Fix threadsafety of Dynamoid::Adapter (issue #10)

I changed Dynamoid::Adapter from being a module having global state to a class
that's instantiated by Dynamoid module.  This makes Adapter more testable and
allows me to write specs verifying its lazy-connection behavior.

I extend the lazy behavior to caching tables -- instead of caching tables when
establishing a connection, we cache on first demand.

Turning Dynamoid::Adapter into a class required me to rename the `Adapter`
namespace (module) of AwsSdkV2 to `AdapterPlugin`.
  • Loading branch information...
Philip White
Philip White committed Jul 7, 2015
1 parent c0cf587 commit ed004b2d53c7500e10bca914ee844957939df2df
@@ -152,6 +152,7 @@ Gem::Specification.new do |s|

s.add_runtime_dependency(%q<activemodel>, ["~> 4"])
s.add_runtime_dependency(%q<aws-sdk-resources>, ["~> 2"])
s.add_runtime_dependency(%q<concurrent-ruby>, ["~> 0.9"])
s.add_development_dependency(%q<rake>, [">= 0"])
s.add_development_dependency(%q<rspec>, ["~> 3"])
s.add_development_dependency(%q<bundler>, [">= 0"])
@@ -29,7 +29,7 @@ module Dynamoid
extend self

MAX_ITEM_SIZE = 65_536

def configure
block_given? ? yield(Dynamoid::Config) : Dynamoid::Config
end
@@ -42,5 +42,8 @@ def logger
def included_models
@included_models ||= []
end


def adapter
@adapter ||= Adapter.new
end
end
@@ -1,31 +1,42 @@
# require only 'concurrent/atom' once this issue is resolved:
# https://github.com/ruby-concurrency/concurrent-ruby/pull/377
require 'concurrent'

# encoding: utf-8
module Dynamoid

# Adapter's value-add:
# 1) For the rest of Dynamoid, the gateway to DynamoDB.
# 2) Allows switching `config.adapter` to ease development of a new adapter.
# 3) Caches the list of tables Dynamoid knows about.
module Adapter
def self.tables
@tables
class Adapter
def initialize
@adapter_ = Concurrent::Atom.new(nil)
@tables_ = Concurrent::Atom.new(nil)
end

# The actual adapter currently in use: presently AwsSdk.
#
# @since 0.2.0
def self.adapter
reconnect! unless @adapter
@adapter
def tables
if !@tables_.value
@tables_.swap{|value, args| benchmark('Cache Tables') {list_tables}}
end
@tables_.value
end

# Establishes a connection to the underyling adapter and caches all its tables for speedier future lookups. Issued when the adapter is first called.
# The actual adapter currently in use.
#
# @since 0.2.0
def self.reconnect!
require "dynamoid/adapter/#{Dynamoid::Config.adapter}" unless Dynamoid::Adapter.const_defined?(Dynamoid::Config.adapter.camelcase)
@adapter = Dynamoid::Adapter.const_get(Dynamoid::Config.adapter.camelcase).new
@adapter.connect! if @adapter.respond_to?(:connect!)
@tables = benchmark('Cache Tables') {list_tables}
def adapter
if !@adapter_.value
adapter = self.class.adapter_plugin_class.new
adapter.connect! if adapter.respond_to?(:connect!)
@adapter_.compare_and_set(nil, adapter)
clear_cache!
end
@adapter_.value
end

def clear_cache!
@tables_.swap{|value, args| nil}
end

# Shows how long it takes a method to run on the adapter. Useful for generating logged output.
@@ -37,7 +48,7 @@ def self.reconnect!
# @return the result of the yield
#
# @since 0.2.0
def self.benchmark(method, *args)
def benchmark(method, *args)
start = Time.now
result = yield
Dynamoid.logger.info "(#{((Time.now - start) * 1000.0).round(2)} ms) #{method.to_s.split('_').collect(&:upcase).join(' ')}#{ " - #{args.inspect}" unless args.nil? || args.empty? }"
@@ -53,7 +64,7 @@ def self.benchmark(method, *args)
# @return [Object] the persisted object
#
# @since 0.2.0
def self.write(table, object, options = nil)
def write(table, object, options = nil)
put_item(table, object, options)
end

@@ -69,7 +80,7 @@ def self.write(table, object, options = nil)
# unless multiple ids are passed in.
#
# @since 0.2.0
def self.read(table, ids, options = {})
def read(table, ids, options = {})
range_key = options.delete(:range_key)

if ids.respond_to?(:each)
@@ -87,7 +98,7 @@ def self.read(table, ids, options = {})
# @param [Array] ids to delete, can also be a string of just one id
# @param [Array] range_key of the record to delete, can also be a string of just one range_key
#
def self.delete(table, ids, options = {})
def delete(table, ids, options = {})
range_key = options[:range_key] #array of range keys that matches the ids passed in
if ids.respond_to?(:each)
if range_key.respond_to?(:each)
@@ -96,7 +107,7 @@ def self.delete(table, ids, options = {})
else
ids = range_key ? [[ids, range_key]] : ids
end

batch_delete_item(table => ids)
else
delete_item(table, ids, options)
@@ -109,11 +120,11 @@ def self.delete(table, ids, options = {})
# @param [Hash] scan_hash a hash of attributes: matching records will be returned by the scan
#
# @since 0.2.0
def self.scan(table, query, opts = {})
def scan(table, query, opts = {})
benchmark('Scan', table, query) {adapter.scan(table, query, opts)}
end

def self.create_table(table_name, key, options = {})
def create_table(table_name, key, options = {})
if !tables.include?(table_name)
benchmark('Create Table') { adapter.create_table(table_name, key, options) }
tables << table_name
@@ -124,16 +135,16 @@ def self.create_table(table_name, key, options = {})
# Method delegation with benchmark to the underlying adapter. Faster than relying on method_missing.
#
# @since 0.2.0
define_singleton_method(m) do |*args|
define_method(m) do |*args|
benchmark("#{m.to_s}", args) {adapter.send(m, *args)}
end
end

# Delegate all methods that aren't defind here to the underlying adapter.
#
# @since 0.2.0
def self.method_missing(method, *args, &block)
return benchmark(method, *args) {adapter.send(method, *args, &block)} if @adapter.respond_to?(method)
def method_missing(method, *args, &block)
return benchmark(method, *args) {adapter.send(method, *args, &block)} if adapter.respond_to?(method)
super
end

@@ -152,8 +163,19 @@ def self.method_missing(method, *args, &block)
#
# @return [Array] an array of all matching items
#
def self.query(table_name, opts = {})
@adapter.query(table_name, opts)
def query(table_name, opts = {})
adapter.query(table_name, opts)
end

private

def self.adapter_plugin_class
unless Dynamoid.const_defined?(:AdapterPlugin) && Dynamoid::AdapterPlugin.const_defined?(Dynamoid::Config.adapter.camelcase)
require "dynamoid/adapter_plugin/#{Dynamoid::Config.adapter}"
end

Dynamoid::AdapterPlugin.const_get(Dynamoid::Config.adapter.camelcase)
end

end
end
@@ -1,5 +1,5 @@
module Dynamoid
module Adapter
module AdapterPlugin

# The AwsSdkV2 adapter provides support for the aws-sdk version 2 for ruby.
class AwsSdkV2
@@ -53,18 +53,18 @@ def destroy_all

if key_present?
ranges = []
Dynamoid::Adapter.query(source.table_name, range_query).collect do |hash|
Dynamoid.adapter.query(source.table_name, range_query).collect do |hash|
ids << hash[source.hash_key.to_sym]
ranges << hash[source.range_key.to_sym]
end

Dynamoid::Adapter.delete(source.table_name, ids,{:range_key => ranges})
Dynamoid.adapter.delete(source.table_name, ids,{:range_key => ranges})
else
Dynamoid::Adapter.scan(source.table_name, query, scan_opts).collect do |hash|
Dynamoid.adapter.scan(source.table_name, query, scan_opts).collect do |hash|
ids << hash[source.hash_key.to_sym]
end

Dynamoid::Adapter.delete(source.table_name, ids)
Dynamoid.adapter.delete(source.table_name, ids)
end
end

@@ -117,7 +117,7 @@ def records

def records_via_query
Enumerator.new do |yielder|
Dynamoid::Adapter.query(source.table_name, range_query).each do |hash|
Dynamoid.adapter.query(source.table_name, range_query).each do |hash|
yielder.yield source.from_database(hash)
end
end
@@ -139,7 +139,7 @@ def records_via_scan
end

Enumerator.new do |yielder|
Dynamoid::Adapter.scan(source.table_name, query, scan_opts).each do |hash|
Dynamoid.adapter.scan(source.table_name, query, scan_opts).each do |hash|
yielder.yield source.from_database(hash)
end
end
@@ -61,7 +61,7 @@ def hash_key
#
# @since 0.6.1
def count
Dynamoid::Adapter.count(table_name)
Dynamoid.adapter.count(table_name)
end

# Initialize a new object and immediately save it to the database.
@@ -45,7 +45,7 @@ def find(*ids)
# find all the tweets using hash key and range key with consistent read
# Tweet.find_all([['1', 'red'], ['1', 'green']], :consistent_read => true)
def find_all(ids, options = {})
items = Dynamoid::Adapter.read(self.table_name, ids, options)
items = Dynamoid.adapter.read(self.table_name, ids, options)
items ? items[self.table_name].map{|i| from_database(i)} : []
end

@@ -57,7 +57,7 @@ def find_all(ids, options = {})
#
# @since 0.2.0
def find_by_id(id, options = {})
if item = Dynamoid::Adapter.read(self.table_name, id, options)
if item = Dynamoid.adapter.read(self.table_name, id, options)
from_database(item)
else
nil
@@ -95,7 +95,7 @@ def find_by_composite_key(hash_key, range_key, options = {})
# @return [Array] an array of all matching items
#
def find_all_by_composite_key(hash_key, options = {})
Dynamoid::Adapter.query(self.table_name, options.merge({hash_value: hash_key})).collect do |item|
Dynamoid.adapter.query(self.table_name, options.merge({hash_value: hash_key})).collect do |item|
from_database(item)
end
end
@@ -43,7 +43,7 @@ def create_table(options = {})
:range_key => range_key_hash
}.merge(options)

Dynamoid::Adapter.create_table(options[:table_name], options[:id], options)
Dynamoid.adapter.create_table(options[:table_name], options[:id], options)
end

def from_database(attrs = {})
@@ -180,7 +180,7 @@ def update!(conditions = {}, &block)
options = range_key ? {:range_key => dump_field(self.read_attribute(range_key), self.class.attributes[range_key])} : {}

begin
new_attrs = Dynamoid::Adapter.update_item(self.class.table_name, self.hash_key, options.merge(:conditions => conditions)) do |t|
new_attrs = Dynamoid.adapter.update_item(self.class.table_name, self.hash_key, options.merge(:conditions => conditions)) do |t|
if(self.class.attributes[:lock_version])
t.add(lock_version: 1)
end
@@ -216,7 +216,7 @@ def destroy
# @since 0.2.0
def delete
options = range_key ? {:range_key => dump_field(self.read_attribute(range_key), self.class.attributes[range_key])} : {}
Dynamoid::Adapter.delete(self.class.table_name, self.hash_key, options)
Dynamoid.adapter.delete(self.class.table_name, self.hash_key, options)
end

# Dump this object's attributes into hash form, fit to be persisted into the datastore.
@@ -291,7 +291,7 @@ def persist(conditions = nil)
end

begin
Dynamoid::Adapter.write(self.class.table_name, self.dump, conditions)
Dynamoid.adapter.write(self.class.table_name, self.dump, conditions)
@new_record = false
true
rescue Dynamoid::Errors::ConditionalCheckFailedException => e

0 comments on commit ed004b2

Please sign in to comment.
You can’t perform that action at this time.