Skip to content

Commit

Permalink
Get back loop in the View fetcher
Browse files Browse the repository at this point in the history
Change-Id: I4b86184226e995c91402cf8422f200d20f507d39
Reviewed-on: http://review.couchbase.org/23473
Reviewed-by: Sergey Avseyev <sergey.avseyev@gmail.com>
Tested-by: Sergey Avseyev <sergey.avseyev@gmail.com>
  • Loading branch information
funny-falcon authored and avsej committed Dec 27, 2012
1 parent 7b60690 commit da420c7
Showing 1 changed file with 142 additions and 92 deletions.
234 changes: 142 additions & 92 deletions lib/couchbase/view.rb
Expand Up @@ -65,62 +65,78 @@ class ArrayWithTotalRows < Array # :nodoc:
attr_accessor :total_rows
end

class Synchronizer # :nodoc:
class AsyncHelper # :nodoc:
include Constants
EMPTY = []
def initialize(wrapper_class, bucket, &block)

def initialize(wrapper_class, bucket, include_docs, quiet, block)
@wrapper_class = wrapper_class
@bucket = bucket
@block = block
@quiet = quiet
@include_docs = include_docs
@queue = []
@first = @shift = 0
@block = block
@completed = false
end

def <<(obj)
@queue << obj
# Register object in the emitter.
def push(obj)
if @include_docs
@queue << obj
@bucket.get(obj[S_ID], :extended => true, :quiet => @quiet) do |res|
obj[S_DOC] = {
S_VALUE => res.value,
S_META => {
S_ID => obj[S_ID],
S_FLAGS => res.flags,
S_CAS => res.cas
}
}
check_for_ready_documents
end
else
old_obj = @queue.shift
@queue << obj
block_call(old_obj) if old_obj
end
end

def complete!
if @include_docs
@completed = true
elsif !@queue.empty?
obj = @queue.shift
obj[S_IS_LAST] = true
block_call obj
end
end

private

def block_call(obj)
@block.call @wrapper_class.wrap(@bucket, obj)
end

def check!
def check_for_ready_documents
shift = @shift
while @first < @queue.size + shift
obj = @queue[@first - shift]
queue = @queue
while @first < queue.size + shift
obj = queue[@first - shift]
break unless obj[S_DOC]
@queue[@first - shift] = nil
queue[@first - shift] = nil
@first += 1
if @completed && @first == @queue.size + shift
if @completed && @first == queue.size + shift
obj[S_IS_LAST] = true
end
@block.call @wrapper_class.wrap(@bucket, obj)
block_call obj
end
if @first - shift > @queue.size / 2
@queue[0, @first - shift] = EMPTY
if @first - shift > queue.size / 2
queue[0, @first - shift] = EMPTY
@shift = @first
end
end

def completed!
@completed = true
end
end

class Proxy # :nodoc:
def initialize(wrapper_class, bucket, array)
@wrapper_class = wrapper_class
@bucket = bucket
@array = array
end

def <<(obj)
@array << @wrapper_class.wrap(@bucket, obj)
end

def check!
end

def completed!
@array.last.instance_variable_set(:@last, true)
end
end

attr_reader :params
Expand Down Expand Up @@ -173,12 +189,19 @@ def initialize(bucket, endpoint, params = {})
#
def each(params = {})
return enum_for(:each, params) unless block_given?
if @bucket.async?
raise ArgumentError, "CouchBase::View#each should not be used in asynchronous mode"
end
fetch(params) {|doc| yield(doc)}
end

def first(params = {})
params = params.merge(:limit => 1)
fetch(params).first
end

def take(n, params = {})
params = params.merge(:limit => n)
fetch(params)
end

# Registers callback function for handling error objects in view
# results stream.
#
Expand Down Expand Up @@ -326,11 +349,7 @@ def on_error(&callback)
# doc.recent_posts_with_comments(:start_key => [post_id, 0],
# :end_key => [post_id, 1],
# :include_docs => true)
def fetch(params = {})
if @bucket.async? && !block_given?
raise ArgumentError, "Could not call View#fetch without block on asynchronous connection"
end

def fetch(params = {}, &block)
params = @params.merge(params)
include_docs = params.delete(:include_docs)
quiet = params.delete(:quiet){ true }
Expand All @@ -343,82 +362,113 @@ def fetch(params = {})
path = Utils.build_query(@endpoint, params)
request = @bucket.make_http_request(path, options)

if @bucket.async?
if block
fetch_async(request, include_docs, quiet, block)
end
else
fetch_sync(request, include_docs, quiet, block)
end
end


# Returns a string containing a human-readable representation of the {View}
#
# @return [String]
def inspect
%(#<#{self.class.name}:#{self.object_id} @endpoint=#{@endpoint.inspect} @params=#{@params.inspect}>)
end

private

def send_error(*args)
if @on_error
@on_error.call(*args.take(2))
else
raise Error::View.new(*args)
end
end

def fetch_async(request, include_docs, quiet, block)
filter = ["/rows/", "/errors/"]
filter << "/total_rows" unless block_given?
parser = YAJI::Parser.new(:filter => filter, :with_path => true)
helper = AsyncHelper.new(@wrapper_class, @bucket, include_docs, quiet, block)

if block_given?
if include_docs
sync = Synchronizer.new(@wrapper_class, @bucket){|doc| yield doc}
end
else
docs = ArrayWithTotalRows.new
if include_docs
sync = Proxy.new(@wrapper_class, @bucket, docs)
request.on_body do |chunk|
if chunk.success?
parser << chunk.value if chunk.value
helper.complete! if chunk.completed?
else
send_error("http_error", chunk.error)
end
end

request.on_body do |chunk|
if chunk.error
if @on_error
@on_error.call("http_error", chunk.error)
else
raise Error::View.new("http_error", chunk.error, nil)
end
parser.on_object do |path, obj|
case path
when "/errors/"
from, reason = obj["from"], obj["reason"]
send_error(from, reason)
else
parser << chunk.value if chunk.value
sync.completed! if include_docs && chunk.completed?
helper.push(obj)
end
end

request.perform
nil
end

def fetch_sync(request, include_docs, quiet, block)
res = []
filter = ["/rows/", "/errors/"]
unless block
filter << "/total_rows"
docs = ArrayWithTotalRows.new
end
parser = YAJI::Parser.new(:filter => filter, :with_path => true)
last_chunk = nil

request.on_body do |chunk|
last_chunk = chunk
res << chunk.value if chunk.success?
end

parser.on_object do |path, obj|
case path
when "/total_rows"
# if total_rows key present, save it and take next object
docs.total_rows = obj
when "/errors/"
from, reason = obj["from"], obj["reason"]
if @on_error
@on_error.call(from, reason)
else
raise Error::View.new(from, reason)
end
send_error(from, reason)
else
if include_docs
sync << obj
@bucket.get(obj[S_ID], :extended => true, :quiet => quiet) do |res|
obj[S_DOC] = {
S_VALUE => res.value,
S_META => {
S_ID => obj[S_ID],
S_FLAGS => res.flags,
S_CAS => res.cas
}
val, flags, cas = @bucket.get(obj[S_ID], :extended => true, :quiet => quiet)
obj[S_DOC] = {
S_VALUE => val,
S_META => {
S_ID => obj[S_ID],
S_FLAGS => flags,
S_CAS => cas
}
sync.check!
end
else
doc = @wrapper_class.wrap(@bucket, obj)
block_given? ? (yield doc) : docs << doc
}
end
doc = @wrapper_class.wrap(@bucket, obj)
block ? block.call(doc) : docs << doc
end
end

if @bucket.async?
request.perform
request.continue

if last_chunk.success?
while value = res.shift
parser << value
end
else
@bucket.run { request.perform }
send_error("http_error", last_chunk.error, nil)
end
# return nil for call with block
block_given? ? nil : docs
end


# Returns a string containing a human-readable representation of the {View}
#
# @return [String]
def inspect
%(#<#{self.class.name}:#{self.object_id} @endpoint=#{@endpoint.inspect} @params=#{@params.inspect}>)
# return nil for call with block
docs
end
end
end

0 comments on commit da420c7

Please sign in to comment.