Skip to content

Commit

Permalink
Refactoring streamer code.
Browse files Browse the repository at this point in the history
  • Loading branch information
samlown committed Apr 5, 2011
1 parent a4df951 commit 0a62fdc
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 41 deletions.
49 changes: 29 additions & 20 deletions lib/couchrest/database.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def initialize(server, name)
@host = server.uri
@uri = "/#{name.gsub('/','%2F')}"
@root = host + uri
@streamer = Streamer.new(self)
@streamer = Streamer.new
@bulk_save_cache = []
@bulk_save_cache_limit = 500 # must be smaller than the uuid count
end
Expand All @@ -34,16 +34,6 @@ def info
CouchRest.get @root
end

# Query the <tt>_all_docs</tt> view. Accepts all the same arguments as view.
def documents(params = {})
keys = params.delete(:keys)
url = CouchRest.paramify_url "#{@root}/_all_docs", params
if keys
CouchRest.post(url, {:keys => keys})
else
CouchRest.get url
end
end

# Query a CouchDB-Lucene search view
def search(name, params={})
Expand Down Expand Up @@ -75,21 +65,40 @@ def slow_view(funcs, params = {})
# paramaters as described in http://wiki.apache.org/couchdb/HttpViewApi
def view(name, params = {}, &block)
keys = params.delete(:keys)
name = name.split('/') # I think this will always be length == 2, but maybe not...
dname = name.shift
vname = name.join('/')
url = CouchRest.paramify_url "#{@root}/_design/#{dname}/_view/#{vname}", params
if keys
CouchRest.post(url, {:keys => keys})
# Try recognising the name, otherwise assume already prepared
view_path = name =~ /^([^_].+?)\/(.*)$/ ? "_design/#{$1}/_view/#{$2}" : name
url = CouchRest.paramify_url "#{@root}/#{view_path}"
if block_given?
if keys
@streamer.post(url, {:keys => keys}, &block)
else
@streamer.get(url, &block)
end
else
if block_given?
@streamer.view("_design/#{dname}/_view/#{vname}", params, &block)
if keys
CouchRest.post(url, {:keys => keys})
else
CouchRest.get url
end
end
end


# Query the <tt>_all_docs</tt> view. Accepts all the same arguments as view.
def documents(params = {}, &block)
view("_all_docs", params, &block)
end
alias documents all_docs

# Query CouchDB's special <tt>_changes</tt> feed for the latest.
# All standard CouchDB options can be provided.
#
# Warning: sending :feed => 'continuous' will cause your code to block
# indefinetly while waiting for changes. You might want to look-up an
# alternative to this.
def changes(params = {}, &block)
view("_changes", params, &block)
end

# GET a document from CouchDB, by id. Returns a Ruby Hash.
def get(id, params = {})
slug = escape_docid(id)
Expand Down
43 changes: 23 additions & 20 deletions lib/couchrest/helper/streamer.rb
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
module CouchRest
class Streamer
attr_accessor :db
def initialize db
@db = db

attr_accessor :default_curl_opts

def initialize
self.default_curl_opts = "--silent --no-buffer --tcp-nodelay"
end

# Stream a view, yielding one row at a time. Shells out to <tt>curl</tt> to keep RAM usage low when you have millions of rows.
def view name, params = nil, &block
urlst = if /^_/.match(name) then
"#{@db.root}/#{name}"
else
name = name.split('/')
dname = name.shift
vname = name.join('/')
"#{@db.root}/_design/#{dname}/_view/#{vname}"
end
url = CouchRest.paramify_url urlst, params
# puts "stream #{url}"

def view(*args)
raise "CouchRest::Streamer#view is depricated. Please use Database#view with block."
end

def get(url, &block)
open_pipe("curl #{default_curl_opts} \"#{url}\"", &block)
end

def post(url, params = {}, &block)
open_pipe("curl #{default_curl_opts} -d \"#{CGI.encode(params.to_json)}\" \"#{url}\"", &block)
end

protected

def open_pipe(cmd, &block)
first = nil
IO.popen("curl --silent '#{url}'") do |view|
IO.popen(cmd) do |view|
first = view.gets # discard header
while line = view.gets
row = parse_line(line)
Expand All @@ -27,9 +32,7 @@ def view name, params = nil, &block
end
parse_first(first)
end

private


def parse_line line
return nil unless line
if /(\{.*\}),?/.match(line.chomp)
Expand Down
2 changes: 1 addition & 1 deletion spec/couchrest/helpers/streamer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
@db = @cr.database(TESTDB)
@db.delete! rescue nil
@db = @cr.create_db(TESTDB) rescue nil
@streamer = CouchRest::Streamer.new(@db)
@streamer = CouchRest::Streamer.new()
@docs = (1..1000).collect{|i| {:integer => i, :string => i.to_s}}
@db.bulk_save(@docs)
@db.save_doc({
Expand Down

0 comments on commit 0a62fdc

Please sign in to comment.