Skip to content

Commit

Permalink
Split classes and improved executor.
Browse files Browse the repository at this point in the history
  • Loading branch information
AlphaHydrae committed Jan 11, 2014
1 parent 9da6259 commit 4fc820a
Show file tree
Hide file tree
Showing 15 changed files with 409 additions and 206 deletions.
186 changes: 10 additions & 176 deletions lib/multi_redis.rb
@@ -1,20 +1,10 @@
require 'ostruct'
require 'redis'
require 'thread'

module MultiRedis
VERSION = '0.2.0'

@redis = nil
@mutex = Mutex.new
@executor = nil

def self.redis= redis
@redis = redis
end

def self.redis
@redis
class << self
attr_accessor :redis
end

def self.execute *args, &block
Expand All @@ -23,181 +13,25 @@ def self.execute *args, &block

executor = @mutex.synchronize do
@executor = Executor.new options
args.each{ |op| @executor.register op }
args.each{ |op| @executor.add op }
yield if block_given?
@executor
end

executor.execute
end

def self.executing?
!!@executor
end

def self.register_operation op, *args
@executor.register op, *args
end

module Extension

def multi_redis_operation symbol, options = {}, &block
op = Operation.new self, options, &block
define_method symbol do |*args|
op.execute *args
end
self
end
end

class Executor

def initialize options = {}
@operations = []
@arguments = []
@redis = options[:redis]
end

def register operation, *args
@operations << operation
@arguments << args
end

def execute options = {}

redis = @redis || MultiRedis.redis
contexts = Array.new(@operations.length){ |i| Context.new redis }
stacks = @operations.collect{ |op| op.steps.dup }
args = stacks.collect.with_index{ |a,i| @arguments[i] || [] }
final_results = Array.new @operations.length

while stacks.any? &:any?

# execute all non-multi steps
stacks.each_with_index do |steps,i|
final_results[i] = steps.shift.execute(contexts[i], args[i]) while steps.first && !steps.first.multi_type
end

# execute all pipelined steps, if any
pipelined_steps = stacks.collect{ |steps| steps.first && steps.first.multi_type == :pipelined ? steps.shift : nil }
if pipelined_steps.any?
results = []
redis.pipelined do
pipelined_steps.each_with_index do |step,i|
if step
final_results[i] = step.execute(contexts[i], args[i])
contexts[i].last_results = redis.client.futures[results.length, redis.client.futures.length]
results += contexts[i].last_results
end
end
end
pipelined_steps.each_with_index{ |step,i| contexts[i].resolve_futures! if step }
end

# execute all multi steps, if any
multi_steps = stacks.collect{ |steps| steps.first && steps.first.multi_type == :multi ? steps.shift : nil }
if multi_steps.any?
results = []
redis.multi do
multi_steps.each_with_index do |step,i|
if step
final_results[i] = step.execute(contexts[i], args[i])
contexts[i].last_results = redis.client.futures[results.length, redis.client.futures.length]
results += contexts[i].last_results
end
end
end
multi_steps.each_with_index{ |step,i| contexts[i].resolve_futures! if step }
end
end

final_results.each_with_index{ |results,i| @operations[i].future.value = results if @operations[i].future }

final_results
end
end

class Operation
attr_reader :steps, :future
private

def initialize *args, &block

options = args.last.kind_of?(Hash) ? args.pop : {}

@target = args.shift || options[:target] || self
@redis = options[:redis]
@steps = []

DSL.new(self).instance_eval &block
end

def execute *args
if MultiRedis.executing?
MultiRedis.register_operation self, *args
@future = Future.new
else
Executor.new(redis: @redis).tap{ |e| e.register self, *args }.execute.first
end
end

def add_step multi_type = nil, &block
@steps << Step.new(@target, multi_type, block)
end

class DSL

def initialize op
@op = op
end

def multi &block
@op.add_step :multi, &block
end

def pipelined &block
@op.add_step :pipelined, &block
end

def run &block
@op.add_step &block
end
end
end

class FutureNotReady < RuntimeError

def initialize
super "Value will be available once the operation executes."
end
end

class Future
FutureNotReady = ::MultiRedis::FutureNotReady.new
attr_writer :value

def initialize
@value = FutureNotReady
end
@mutex = Mutex.new
@executor = nil

def value
raise @value if @value.kind_of? RuntimeError
@value
end
def self.executor
@executor
end

class Step

def initialize target, multi_type, block
@target, @multi_type, @block = target, multi_type, block
end

def execute context, *args
@target.instance_exec *args.unshift(context), &@block
end

def multi_type
@multi_type
end
def self.executing?
!!@executor
end
end

Expand Down
15 changes: 12 additions & 3 deletions lib/multi_redis/context.rb
@@ -1,4 +1,3 @@

module MultiRedis

class Context
Expand All @@ -10,6 +9,15 @@ def initialize redis
@redis = redis
end

def execute shared_results, operation, *args
operation_result = operation.execute self, *args
if @resolve = @redis.client.respond_to?(:futures)
@last_results = @redis.client.futures[shared_results.length, @redis.client.futures.length]
shared_results.concat @last_results
end
operation_result
end

def redis
@redis
end
Expand All @@ -19,8 +27,9 @@ def data
end

def resolve_futures!
@data.contents.each_key do |k|
@data.contents[k] = @data.contents[k].value if @data.contents[k].is_a? Redis::Future
return unless @resolve
@data.each_key do |k|
@data[k] = @data[k].value if @data[k].is_a? Redis::Future
end
@last_results.collect!{ |r| r.is_a?(Redis::Future) ? r.value : r }
end
Expand Down
27 changes: 7 additions & 20 deletions lib/multi_redis/data.rb
@@ -1,27 +1,14 @@

module MultiRedis

class Data
attr_reader :contents

def initialize
@contents = Hash.new
end

def [] k
@contents[k]
end

def []= k, v
@contents[k.to_sym] = v
end
class Data < Hash

def method_missing symbol, *args, &block
if @contents.key? symbol
@contents[symbol]
elsif m = symbol.to_s.match(/\A(.*)\=\Z/)
raise "Reserved name" if respond_to? acc = m[1].to_sym
@contents[acc] = args[0]
if args.empty?
self[symbol]
elsif args.length == 1 && m = symbol.to_s.match(/\A(.*)\=\Z/)
acc = m[1].to_sym
raise ArgumentError, "Cannot set property #{acc}, method ##{acc} already exists" if respond_to? acc
self[acc] = args[0]
else
super symbol, *args, &block
end
Expand Down
95 changes: 95 additions & 0 deletions lib/multi_redis/executor.rb
@@ -0,0 +1,95 @@
module MultiRedis

class Executor

def initialize options = {}
@operations = []
@redis = options[:redis]
end

def add operation, *args
@operations << { op: operation, args: args }
end

def execute options = {}

redis = @redis || MultiRedis.redis

total = 0
execution = Array.new @operations.length do |i|
total += @operations[i][:op].steps.length
OperationExecution.new @operations[i][:op], @operations[i][:args], redis
end

while execution.any?{ |oe| !oe.done? } && total >= 1
total -= 1 # safeguard against infinite loop

execution.each do |oe|
oe.execute_current_step while oe.next? :call
end

TYPES.each do |type|
if execution.any?{ |oe| oe.next? type }
shared_results = [] # TODO: use shared context object
redis.send type do
execution.each do |oe|
oe.execute_current_step shared_results if oe.next? type
end
end
execution.each{ |oe| oe.resolve_futures! }
end
end
end

execution.each{ |oe| oe.resolve_operation_future! }
execution.collect!{ |oe| oe.final_results }
end

private

TYPES = [ :pipelined, :multi ]

class OperationExecution
attr_reader :final_results

def initialize operation, args, redis

@operation = operation
@args = args

@context = Context.new redis
@steps = operation.steps

@current_index = 0
end

def done?
!current_step
end

def next? type
current_step && current_step.type == type
end

def execute_current_step shared_results = nil
results = @context.execute shared_results, current_step, *@args
@current_index += 1
@final_results = results
end

def resolve_futures!
@context.resolve_futures!
end

def resolve_operation_future!
@operation.future.value = @final_results if @operation.future
end

private

def current_step
@steps[@current_index]
end
end
end
end
13 changes: 13 additions & 0 deletions lib/multi_redis/extension.rb
@@ -0,0 +1,13 @@
module MultiRedis

module Extension

def multi_redis_operation symbol, options = {}, &block
op = Operation.new options.merge(target: self), &block
define_method symbol do |*args|
op.execute *args
end
self
end
end
end

0 comments on commit 4fc820a

Please sign in to comment.