Skip to content

Commit

Permalink
implement actioncable streams using #async_run
Browse files Browse the repository at this point in the history
  • Loading branch information
tinco committed Jul 24, 2016
1 parent 483972a commit 3a3e130
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 1 deletion.
56 changes: 56 additions & 0 deletions lib/no_brainer/streams.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
module NoBrainer::Streams
extend ActiveSupport::Concern

included do
on_unsubscribe :stop_all_streams
end

private

def stream_from(query, options = {}, callback = nil)
callback ||= -> (changes) do
transmit changes, via: "streamed from #{query.inspect}"
end

# defer_subscription_confirmation!
connection = NoBrainer::Streams::streams_connection
cursor = query.to_rql.changes(options).async_run(connection, ConcurrentAsyncHandler, &callback)
cursors << cursor
end

def stop_all_streams
cursors.each do |cursor|
begin
logger.info "Closing cursor: #{cursor.inspect}"
cursor.close
rescue => e
logger.error "Could not close cursor: #{e.message}\n#{e.backtrace.join("\n")}"
end
end
end

def cursors
@_cursors ||= []
end

def self.streams_connection
@@streams_connection ||= NoBrainer::ConnectionManager.get_new_connection.raw
end

class ConcurrentAsyncHandler < RethinkDB::AsyncHandler
def run(&action)
options[:query_handle_class] = AsyncQueryHandler
yield
end

def handler
callback
end

class AsyncQueryHandler < RethinkDB::QueryHandle
def guarded_async_run(&b)
Concurrent.global_io_executor.post(&b)
end
end
end
end
2 changes: 1 addition & 1 deletion lib/nobrainer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ module NoBrainer
# We eager load things that could be loaded when handling the first web request.
# Code that is loaded through the DSL of NoBrainer should not be eager loaded.
autoload :Document, :IndexManager, :Loader, :Fork, :Geo, :SymbolDecoration
eager_autoload :Config, :Connection, :ConnectionManager, :Error,
eager_autoload :Config, :Connection, :ConnectionManager, :Error, :Streams,
:QueryRunner, :Criteria, :RQL, :Lock, :ReentrantLock, :Profiler, :System

class << self
Expand Down

0 comments on commit 3a3e130

Please sign in to comment.