diff --git a/lib/multi_redis.rb b/lib/multi_redis.rb index 98399e9..6f5a582 100644 --- a/lib/multi_redis.rb +++ b/lib/multi_redis.rb @@ -1,20 +1,10 @@ -require 'ostruct' require 'redis' -require 'thread' module MultiRedis VERSION = '0.2.0' - @redis = nil - @mutex = Mutex.new - @executor = nil - - def self.redis= redis - @redis = redis - end - - def self.redis - @redis + class << self + attr_accessor :redis end def self.execute *args, &block @@ -23,7 +13,7 @@ def self.execute *args, &block executor = @mutex.synchronize do @executor = Executor.new options - args.each{ |op| @executor.register op } + args.each{ |op| @executor.add op } yield if block_given? @executor end @@ -31,173 +21,17 @@ def self.execute *args, &block executor.execute end - def self.executing? - !!@executor - end - - def self.register_operation op, *args - @executor.register op, *args - end - - module Extension - - def multi_redis_operation symbol, options = {}, &block - op = Operation.new self, options, &block - define_method symbol do |*args| - op.execute *args - end - self - end - end - - class Executor - - def initialize options = {} - @operations = [] - @arguments = [] - @redis = options[:redis] - end - - def register operation, *args - @operations << operation - @arguments << args - end - - def execute options = {} - - redis = @redis || MultiRedis.redis - contexts = Array.new(@operations.length){ |i| Context.new redis } - stacks = @operations.collect{ |op| op.steps.dup } - args = stacks.collect.with_index{ |a,i| @arguments[i] || [] } - final_results = Array.new @operations.length - - while stacks.any? &:any? - - # execute all non-multi steps - stacks.each_with_index do |steps,i| - final_results[i] = steps.shift.execute(contexts[i], args[i]) while steps.first && !steps.first.multi_type - end - - # execute all pipelined steps, if any - pipelined_steps = stacks.collect{ |steps| steps.first && steps.first.multi_type == :pipelined ? steps.shift : nil } - if pipelined_steps.any? - results = [] - redis.pipelined do - pipelined_steps.each_with_index do |step,i| - if step - final_results[i] = step.execute(contexts[i], args[i]) - contexts[i].last_results = redis.client.futures[results.length, redis.client.futures.length] - results += contexts[i].last_results - end - end - end - pipelined_steps.each_with_index{ |step,i| contexts[i].resolve_futures! if step } - end - - # execute all multi steps, if any - multi_steps = stacks.collect{ |steps| steps.first && steps.first.multi_type == :multi ? steps.shift : nil } - if multi_steps.any? - results = [] - redis.multi do - multi_steps.each_with_index do |step,i| - if step - final_results[i] = step.execute(contexts[i], args[i]) - contexts[i].last_results = redis.client.futures[results.length, redis.client.futures.length] - results += contexts[i].last_results - end - end - end - multi_steps.each_with_index{ |step,i| contexts[i].resolve_futures! if step } - end - end - - final_results.each_with_index{ |results,i| @operations[i].future.value = results if @operations[i].future } - - final_results - end - end - - class Operation - attr_reader :steps, :future + private - def initialize *args, &block - - options = args.last.kind_of?(Hash) ? args.pop : {} - - @target = args.shift || options[:target] || self - @redis = options[:redis] - @steps = [] - - DSL.new(self).instance_eval &block - end - - def execute *args - if MultiRedis.executing? - MultiRedis.register_operation self, *args - @future = Future.new - else - Executor.new(redis: @redis).tap{ |e| e.register self, *args }.execute.first - end - end - - def add_step multi_type = nil, &block - @steps << Step.new(@target, multi_type, block) - end - - class DSL - - def initialize op - @op = op - end - - def multi &block - @op.add_step :multi, &block - end - - def pipelined &block - @op.add_step :pipelined, &block - end - - def run &block - @op.add_step &block - end - end - end - - class FutureNotReady < RuntimeError - - def initialize - super "Value will be available once the operation executes." - end - end - - class Future - FutureNotReady = ::MultiRedis::FutureNotReady.new - attr_writer :value - - def initialize - @value = FutureNotReady - end + @mutex = Mutex.new + @executor = nil - def value - raise @value if @value.kind_of? RuntimeError - @value - end + def self.executor + @executor end - class Step - - def initialize target, multi_type, block - @target, @multi_type, @block = target, multi_type, block - end - - def execute context, *args - @target.instance_exec *args.unshift(context), &@block - end - - def multi_type - @multi_type - end + def self.executing? + !!@executor end end diff --git a/lib/multi_redis/context.rb b/lib/multi_redis/context.rb index 8fe114c..b93a807 100644 --- a/lib/multi_redis/context.rb +++ b/lib/multi_redis/context.rb @@ -1,4 +1,3 @@ - module MultiRedis class Context @@ -10,6 +9,15 @@ def initialize redis @redis = redis end + def execute shared_results, operation, *args + operation_result = operation.execute self, *args + if @resolve = @redis.client.respond_to?(:futures) + @last_results = @redis.client.futures[shared_results.length, @redis.client.futures.length] + shared_results.concat @last_results + end + operation_result + end + def redis @redis end @@ -19,8 +27,9 @@ def data end def resolve_futures! - @data.contents.each_key do |k| - @data.contents[k] = @data.contents[k].value if @data.contents[k].is_a? Redis::Future + return unless @resolve + @data.each_key do |k| + @data[k] = @data[k].value if @data[k].is_a? Redis::Future end @last_results.collect!{ |r| r.is_a?(Redis::Future) ? r.value : r } end diff --git a/lib/multi_redis/data.rb b/lib/multi_redis/data.rb index 1c2ce83..b3d3dcc 100644 --- a/lib/multi_redis/data.rb +++ b/lib/multi_redis/data.rb @@ -1,27 +1,14 @@ - module MultiRedis - class Data - attr_reader :contents - - def initialize - @contents = Hash.new - end - - def [] k - @contents[k] - end - - def []= k, v - @contents[k.to_sym] = v - end + class Data < Hash def method_missing symbol, *args, &block - if @contents.key? symbol - @contents[symbol] - elsif m = symbol.to_s.match(/\A(.*)\=\Z/) - raise "Reserved name" if respond_to? acc = m[1].to_sym - @contents[acc] = args[0] + if args.empty? + self[symbol] + elsif args.length == 1 && m = symbol.to_s.match(/\A(.*)\=\Z/) + acc = m[1].to_sym + raise ArgumentError, "Cannot set property #{acc}, method ##{acc} already exists" if respond_to? acc + self[acc] = args[0] else super symbol, *args, &block end diff --git a/lib/multi_redis/executor.rb b/lib/multi_redis/executor.rb new file mode 100644 index 0000000..0ae6df8 --- /dev/null +++ b/lib/multi_redis/executor.rb @@ -0,0 +1,95 @@ +module MultiRedis + + class Executor + + def initialize options = {} + @operations = [] + @redis = options[:redis] + end + + def add operation, *args + @operations << { op: operation, args: args } + end + + def execute options = {} + + redis = @redis || MultiRedis.redis + + total = 0 + execution = Array.new @operations.length do |i| + total += @operations[i][:op].steps.length + OperationExecution.new @operations[i][:op], @operations[i][:args], redis + end + + while execution.any?{ |oe| !oe.done? } && total >= 1 + total -= 1 # safeguard against infinite loop + + execution.each do |oe| + oe.execute_current_step while oe.next? :call + end + + TYPES.each do |type| + if execution.any?{ |oe| oe.next? type } + shared_results = [] # TODO: use shared context object + redis.send type do + execution.each do |oe| + oe.execute_current_step shared_results if oe.next? type + end + end + execution.each{ |oe| oe.resolve_futures! } + end + end + end + + execution.each{ |oe| oe.resolve_operation_future! } + execution.collect!{ |oe| oe.final_results } + end + + private + + TYPES = [ :pipelined, :multi ] + + class OperationExecution + attr_reader :final_results + + def initialize operation, args, redis + + @operation = operation + @args = args + + @context = Context.new redis + @steps = operation.steps + + @current_index = 0 + end + + def done? + !current_step + end + + def next? type + current_step && current_step.type == type + end + + def execute_current_step shared_results = nil + results = @context.execute shared_results, current_step, *@args + @current_index += 1 + @final_results = results + end + + def resolve_futures! + @context.resolve_futures! + end + + def resolve_operation_future! + @operation.future.value = @final_results if @operation.future + end + + private + + def current_step + @steps[@current_index] + end + end + end +end diff --git a/lib/multi_redis/extension.rb b/lib/multi_redis/extension.rb new file mode 100644 index 0000000..b330619 --- /dev/null +++ b/lib/multi_redis/extension.rb @@ -0,0 +1,13 @@ +module MultiRedis + + module Extension + + def multi_redis_operation symbol, options = {}, &block + op = Operation.new options.merge(target: self), &block + define_method symbol do |*args| + op.execute *args + end + self + end + end +end diff --git a/lib/multi_redis/future.rb b/lib/multi_redis/future.rb new file mode 100644 index 0000000..9bd1f90 --- /dev/null +++ b/lib/multi_redis/future.rb @@ -0,0 +1,23 @@ +module MultiRedis + + class FutureNotReady < RuntimeError + + def initialize + super "Value will be available once the operation executes." + end + end + + class Future + FutureNotReady = ::MultiRedis::FutureNotReady.new + attr_writer :value + + def initialize value = nil + @value = value || FutureNotReady + end + + def value + raise @value if @value.kind_of? RuntimeError + @value + end + end +end diff --git a/lib/multi_redis/operation.rb b/lib/multi_redis/operation.rb new file mode 100644 index 0000000..3f901d2 --- /dev/null +++ b/lib/multi_redis/operation.rb @@ -0,0 +1,61 @@ +module MultiRedis + + class Operation + attr_accessor :redis + attr_reader :steps, :future + + def initialize options = {}, &block + + @target = options[:target] || self + @redis = options[:redis] + @steps = [] + + configure &block if block + end + + def configure &block + DSL.new(self).instance_eval &block + end + + def execute *args + if MultiRedis.executing? + MultiRedis.executor.add self, *args + @future = Future.new + else + e = Executor.new redis: @redis + e.add self, *args + e.execute.first.tap do |result| + @future = Future.new result + end + end + end + + def add type, &block + raise ArgumentError, "Unknown type #{type}, must be one of #{TYPES.join ', '}." unless TYPES.include? type + @steps << Step.new(@target, type, block) + end + + private + + TYPES = [ :call, :pipelined, :multi ] + + class DSL + + def initialize op + @op = op + end + + def multi &block + @op.add :multi, &block + end + + def pipelined &block + @op.add :pipelined, &block + end + + def run &block + @op.add :call, &block + end + end + end +end diff --git a/lib/multi_redis/step.rb b/lib/multi_redis/step.rb new file mode 100644 index 0000000..7e58342 --- /dev/null +++ b/lib/multi_redis/step.rb @@ -0,0 +1,17 @@ +module MultiRedis + + class Step + + def initialize target, type, block + @target, @type, @block = target, type, block + end + + def execute context, *args + @target.instance_exec *args.unshift(context), &@block + end + + def type + @type + end + end +end diff --git a/spec/multi_redis_spec.rb b/spec/demo_spec.rb similarity index 92% rename from spec/multi_redis_spec.rb rename to spec/demo_spec.rb index 434a171..1dd313d 100644 --- a/spec/multi_redis_spec.rb +++ b/spec/demo_spec.rb @@ -11,7 +11,7 @@ expect($redis.client).not_to receive(:call) expect($redis).to receive(:multi).twice.and_call_original - m = MultiRedis::Operation.new self, redis: $redis do + m = MultiRedis::Operation.new target: self, redis: $redis do multi do |mr| @@ -59,7 +59,7 @@ expect($redis.client).not_to receive(:call) expect($redis).to receive(:multi).once.and_call_original - op1 = MultiRedis::Operation.new self, redis: $redis do + op1 = MultiRedis::Operation.new target: self, redis: $redis do multi do |mr| mr.data.a = mr.redis.get key('foo') @@ -75,7 +75,7 @@ end end - op2 = MultiRedis::Operation.new self, redis: $redis do + op2 = MultiRedis::Operation.new target: self, redis: $redis do multi do |mr| mr.data.c = mr.redis.get key('baz') @@ -90,8 +90,8 @@ end executor = MultiRedis::Executor.new redis: $redis - executor.register op1 - executor.register op2 + executor.add op1 + executor.add op2 results = executor.execute expect(results).to eq([ 'result1', 'result2' ]) diff --git a/spec/helper.rb b/spec/helper.rb index 39b0c73..d1ba77e 100644 --- a/spec/helper.rb +++ b/spec/helper.rb @@ -13,6 +13,10 @@ require 'coveralls' Coveralls.wear! +SimpleCov.configure do + add_filter 'spec/support' +end + SimpleCov.formatter = SimpleCov::Formatter::MultiFormatter[ SimpleCov::Formatter::HTMLFormatter, Coveralls::SimpleCov::Formatter @@ -21,7 +25,7 @@ require 'rspec' require 'set' -Dir[File.dirname(__FILE__) + "/support/**/*.rb"].each { |f| require f } +Dir[File.dirname(__FILE__) + "/support/**/*.rb"].each{ |f| require f } RSpec.configure do |config| diff --git a/spec/multi_redis/context_spec.rb b/spec/multi_redis/context_spec.rb new file mode 100644 index 0000000..e69de29 diff --git a/spec/multi_redis/data_spec.rb b/spec/multi_redis/data_spec.rb new file mode 100644 index 0000000..10abb16 --- /dev/null +++ b/spec/multi_redis/data_spec.rb @@ -0,0 +1,32 @@ +require 'helper' + +describe MultiRedis::Data do + + it "should start empty" do + expect(subject).to be_empty + end + + it "should be a hash" do + expect(subject).to be_a_kind_of(Hash) + end + + it "should behave like an open structure" do + + expect(subject.foo).to be_nil + expect(subject.foo = 'bar').to eq('bar') + expect(subject.foo).to eq('bar') + expect(subject[:foo]).to eq('bar') + + subject[:baz] = 'qux' + expect(subject.baz).to eq('qux') + end + + it "should not allow existing methods to be overriden" do + expect{ subject.to_s = 'foo' }.to raise_error(ArgumentError, "Cannot set property to_s, method #to_s already exists") + end + + it "should still fail for unknown methods" do + expect{ subject.foo 'bar' }.to raise_error(NoMethodError) + expect{ subject.send :baz=, :ham, :eggs }.to raise_error(NoMethodError) + end +end diff --git a/spec/multi_redis/future_spec.rb b/spec/multi_redis/future_spec.rb new file mode 100644 index 0000000..094d641 --- /dev/null +++ b/spec/multi_redis/future_spec.rb @@ -0,0 +1,15 @@ + +describe MultiRedis::Future do + + describe "#value" do + + it "should raise an error if the value has not yet been set" do + expect{ subject.value }.to raise_error(MultiRedis::FutureNotReady, "Value will be available once the operation executes.") + end + + it "should return the value once set" do + subject.value = 'foo' + expect(subject.value).to eq('foo') + end + end +end diff --git a/spec/multi_redis/operation_spec.rb b/spec/multi_redis/operation_spec.rb new file mode 100644 index 0000000..1923d4c --- /dev/null +++ b/spec/multi_redis/operation_spec.rb @@ -0,0 +1,113 @@ +require 'helper' + +describe MultiRedis::Operation do + Step ||= MultiRedis::Step + Future ||= MultiRedis::Future + Executor ||= MultiRedis::Executor + + before :each do + Step.stub(:new){ |*args| args } + end + + it "should define steps" do + + blocks = Array.new(6){ |i| lambda{} } + + op = described_class.new do + run &blocks[0] + pipelined &blocks[1] + end + + op.configure do + multi &blocks[2] + pipelined &blocks[3] + end + + op.add :multi, &blocks[4] + op.add :multi, &blocks[5] + + expect(op.steps).to have(6).items + expect(op.steps[0]).to eq([ op, :call, blocks[0] ]) + expect(op.steps[1]).to eq([ op, :pipelined, blocks[1] ]) + expect(op.steps[2]).to eq([ op, :multi, blocks[2] ]) + expect(op.steps[3]).to eq([ op, :pipelined, blocks[3] ]) + expect(op.steps[4]).to eq([ op, :multi, blocks[4] ]) + expect(op.steps[5]).to eq([ op, :multi, blocks[5] ]) + end + + it "should allow the target to be set through options" do + + target = Object.new + blocks = Array.new(3){ |i| lambda{} } + op = described_class.new target: target do + run &blocks[0] + pipelined &blocks[1] + multi &blocks[2] + end + + expect(op.steps).to have(3).items + expect(op.steps[0]).to eq([ target, :call, blocks[0] ]) + expect(op.steps[1]).to eq([ target, :pipelined, blocks[1] ]) + expect(op.steps[2]).to eq([ target, :multi, blocks[2] ]) + end + + describe "#add" do + + it "should refuse unknown types" do + expect{ subject.add :foo, &lambda{} }.to raise_error(ArgumentError, "Unknown type foo, must be one of call, pipelined, multi.") + end + end + + describe "#execute" do + let(:result){ double } + let(:executor){ double add: nil, execute: [ result ] } + let(:executing){ false } + let(:future){ double } + + before :each do + Executor.stub new: executor unless executing + MultiRedis.stub executing?: executing + Future.stub new: future + end + + it "should execute the operation through an executor" do + + expect(Executor).to receive(:new).with(redis: nil) + expect(executor).to receive(:add).with(subject, :foo, :bar) + expect(Future).to receive(:new).with(result) + + expect(subject.execute(:foo, :bar)).to be(result) + expect(subject.future).to be(future) + end + + describe "in a multi redis execution block" do + let(:executing){ true } + + before :each do + MultiRedis.stub executor: executor + end + + it "should add the operation to the multi redis executor" do + + expect(executor).to receive(:add).with(subject, :baz, :qux) + expect(Future).to receive(:new).with(no_args) + + expect(subject.execute(:baz, :qux)).to be(future) + expect(subject.future).to be(future) + end + end + end + + describe "redis client" do + let(:redis){ double } + + it "should be configurable through options" do + expect(described_class.new(redis: redis).redis).to be(redis) + end + + it "should be configurable through an accessor" do + subject.redis = redis + expect(subject.redis).to eq(redis) + end + end +end diff --git a/spec/version_spec.rb b/spec/multi_redis/version_spec.rb similarity index 76% rename from spec/version_spec.rb rename to spec/multi_redis/version_spec.rb index 5fdc96d..204237e 100644 --- a/spec/version_spec.rb +++ b/spec/multi_redis/version_spec.rb @@ -3,6 +3,6 @@ describe "Version" do it "should be correct" do - expect(MultiRedis::VERSION).to eq(File.read(File.join(File.dirname(__FILE__), '..', 'VERSION'))) + expect(MultiRedis::VERSION).to eq(File.read(File.join(File.dirname(__FILE__), '..', '..', 'VERSION'))) end end