Skip to content

Commit

Permalink
handle websocket disconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
erubboli committed Dec 16, 2016
1 parent b298bf1 commit cbc3508
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 39 deletions.
1 change: 0 additions & 1 deletion bitfinex-rb.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,4 @@ Gem::Specification.new do |spec|
spec.add_runtime_dependency 'faye-websocket', '~> 0.10.3'
spec.add_runtime_dependency 'json', '~> 1.8.3','>= 1.8.3'
spec.add_runtime_dependency 'faraday_middleware', '~> 0.10', '>= 0.10.0'
spec.add_development_dependency "bundler", '~> 1.9.2', '>= 1.8.0'
end
9 changes: 9 additions & 0 deletions examples/ticker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
require_relative "../lib/bitfinex.rb"

client = Bitfinex::Client.new

client.listen_ticker("ETHUSD") do |response|
puts response.join(", ")
end

client.listen!
7 changes: 5 additions & 2 deletions lib/bitfinex/configurable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ def config
end

class Configuration
attr_accessor :api_endpoint, :debug, :debug_connection, :secret, :api_key, :websocket_api_endpoint
attr_accessor :api_endpoint, :debug, :debug_connection, :secret, :api_key, :websocket_api_endpoint, :reconnect, :reconnect_after

def initialize
self.api_endpoint = "https://api.bitfinex.com/v1/"
self.websocket_api_endpoint = "wss://api2.bitfinex.com:3000/ws"
self.websocket_api_endpoint = "wss://api.bitfinex.com/ws"
self.debug = false
self.reconnect = true
self.reconnect_after = 30

self.debug_connection = false
end
Expand Down
1 change: 1 addition & 0 deletions lib/bitfinex/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class ParamsError < ClientError; end
class InvalidAuthKeyError < ClientError; end
class BlockMissingError < ParamsError; end
class ServerError < Exception; end # Error reported back by Binfinex server
class ConnectionClosed < Exception; end
class BadRequestError < ServerError; end
class NotFoundError < ServerError; end
class ForbiddenError < ServerError; end
Expand Down
2 changes: 1 addition & 1 deletion lib/bitfinex/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Bitfinex
VERSION = "0.0.8"
VERSION = "0.0.9"
end
90 changes: 55 additions & 35 deletions lib/bitfinex/websocket_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def listen!
listen
ws_client.run!
end

def ws_send(msg)
ws_client.send msg
end
Expand All @@ -28,7 +28,7 @@ def ws_auth(&block)
add_callback(:auth, &block)
save_channel_id(:auth, 0)
ws_safe_send({
apiKey: config.api_key,
apiKey: config.api_key,
authSig: sign(payload),
authPayload: payload,
event: 'auth'
Expand All @@ -40,8 +40,8 @@ def ws_auth(&block)
def ws_unauth
ws_safe_send({event: 'unauth'})
end
private

private

def ws_reset_channels
@chan_ids = []
Expand All @@ -50,25 +50,30 @@ def ws_reset_channels
end

def ws_client
@ws_client ||= WSClient.new(url:config.websocket_api_endpoint)
options = {
url: config.websocket_api_endpoint,
reconnect: config.reconnect,
reconnect_after: config.reconnect_after
}
@ws_client ||= WSClient.new(options)
end

def chan_ids
@chan_ids ||= []
end
end

def ws_open
@ws_open ||= false
end

def ws_registration_messages
@ws_registration_messages ||= []
end

def callbacks
@callbacks ||= {}
end

def add_callback(channel, &block)
callbacks[channel] = { block: block, chan_id: nil }
end
Expand All @@ -84,7 +89,7 @@ def ws_safe_send(msg)
else
ws_registration_messages.push msg
end
end
end

def register_channel(msg, &block)
add_callback(fingerprint(msg),&block)
Expand All @@ -94,38 +99,38 @@ def register_channel(msg, &block)
ws_registration_messages.push msg.merge(event: 'subscribe')
end
end

def fingerprint(msg)
msg.reject{|k,v| [:event,'chanId','event'].include?(k) }.
inject({}){|h, (k,v)| h[k.to_sym]=v.to_s; h}
end

def listen
ws_client.on(:message) do |rmsg|
msg = JSON.parse(rmsg)
if msg.kind_of?(Hash) && msg["event"] == "subscribed"
save_channel_id(fingerprint(msg), msg["chanId"])
save_channel_id(fingerprint(msg), msg["chanId"])
elsif msg.kind_of?(Array)
exec_callback_for(msg)
end
end
end

def save_channel_id(chan,id)
callbacks[chan][:chan_id] = id
chan_ids[id.to_i] = chan
end

def exec_callback_for(msg)
return if msg[1] == 'hb' #ignore heartbeat
id = msg[0]
callbacks[chan_ids[id.to_i]][:block].call(msg)
end

def subscribe_to_channels
ws_client.on(:open) do
ws_client.on(:open) do
@ws_open = true
ws_registration_messages.each do |msg|
ws_registration_messages.each do |msg|
ws_client.send(msg)
end
end
Expand All @@ -134,54 +139,69 @@ def subscribe_to_channels
class WSClient
def initialize(options = {})
# set some defaults
@url = options[:url] || 'ws://dev2.bitfinex.com:3001/ws'
@reconnect = options[:reconenct] || false
@url = options[:url] || 'wss://api.bitfinex.com/ws'
@reconnect = options[:reconnect] || false
@reconnect_after = options[:reconnect_after] || 30
@stop = false
end

def on(msg, &blk)
ivar = "@#{msg}_cb"
instance_variable_set(ivar.to_sym, blk)
end

def run!
if EventMachine.reactor_running?
connect!
else
EM.run { connect! }
end
end

def stop!
@stop = true
@ws.close
end

def connect!
@stop = false
@ws = Faye::WebSocket::Client.new(@url)
@ws.onopen = method(:ws_opened)
@ws.onmessage = method(:ws_receive)
@ws.onclose = method(:ws_closed)
@ws.onerror = method(:ws_error)
end

def send(msg)
raise ConnectionClosed if stopped?
connect! unless alive?
msg = msg.is_a?(Hash) ? msg.to_json : msg
@ws.send(msg)
end


def alive?
@ws && @ws.ready_state == Faye::WebSocket::API::OPEN
end

def stopped?
@stop
end

private

def ws_opened(event)
@open_cb.call(event) if @open_cb
end

def ws_receive(event)
@message_cb.call(event.data) if @message_cb
end

def ws_closed(_event)
EM.stop
end


def ws_closed(event)
return unless @reconnect
EM.add_timer(@reconnect_after){ connect! } unless @stop
end

def ws_error(event)
fail event
end
Expand Down

0 comments on commit cbc3508

Please sign in to comment.