/
tradeking.rb
181 lines (147 loc) · 4.95 KB
/
tradeking.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
require 'trading_core/quote_streamer/base'
require 'date'
require 'time'
module QuoteStreamer
class Tradeking < Base
def initialize(account)
super(account)
@streaming = false
@symbols = []
@stopping = false
end
def stream_quotes(symbols, &block)
EventMachine.add_periodic_timer(1) do
start_streaming_quotes(symbols, &block) if !@streaming && !@stopping && market_is_open
end
end
def stop
puts 'Streaming stopped'
@streaming = false
@stopping = true
@http.close
end
def live?
true
end
private
def start_streaming_quotes(symbols, &block)
return if @streaming
@streaming = true
symbols = [symbols].flatten
@symbols.concat(symbols.dup).uniq!
previous_data = ''
symbol_data = {}
begin
@account.api.quotes(symbols).each do |quote|
symbol_data[quote['symbol']] = quote
end
@http.close if @http
@http = nil
@http = stream(symbols)
rescue => error
# Unable to connect to API. Return to caller.
puts 'Unable to connect to API...trying again in 30 seconds...'
sleep 30
@streaming = false
return
end
puts 'Starting Tradeking streaming...'
@http.stream do |data|
self.stop if !market_is_open
json_data = nil
data = data.gsub("\n", '')
begin
# Don't try to combine previous data and current data if the current data is
# potentially a full JSON literal.
previous_data = '' if data[0] == '{'
json_data = JSON.parse(previous_data + data)
previous_data = ''
rescue => error
# Only track data for combination with the next data chunk if the current
# data is potentially a full JSON literal.
if data[0] == '{'
previous_data = data
else
previous_data = ''
end
next
end
begin
next if json_data['status'] && json_data['status'] == 'connected'
quote = json_data['quote']
trade = json_data['trade']
symbol = nil
original_symbol_data = symbol_data[quote['symbol']]
# Update quote data.
if quote
next if !symbol_data[quote['symbol']]
symbol = quote['symbol']
symbol_data[quote['symbol']].merge!({
'ask_price' => quote['ask'].to_f,
'bid_price' => quote['bid'].to_f
})
end
# Update trade data.
if trade
next if !symbol_data[trade['symbol']]
next if trade['last'].to_f == 0.0
symbol = trade['symbol']
change = (trade['last'].to_f - symbol_data[symbol]['previous_close'].to_f).round(2)
change_percent = (((trade['last'].to_f / symbol_data[symbol]['previous_close'].to_f) - 1) * 100).round(2)
change_percent = change_percent == 0 ? 0.0 : change_percent
symbol_data[trade['symbol']].merge!({
'last_price' => trade['last'].to_f,
'previous_close' => symbol_data[symbol]['previous_close'].to_f,
'change' => change,
'change_percent' => change_percent,
'trade_volume' => trade['vl'].to_i,
'cumulative_volume' => trade['cvol'].to_i
})
end
next if !symbol
# Don't send the same data twice.
next if original_symbol_data == symbol_data[symbol]
symbol_data[symbol].merge!({
'timestamp' => Time.now.getutc.strftime('%Y-%m-%d %H:%M:%S')
})
yield symbol_data[symbol] if block_given?
rescue => error
end
end
@http.errback do
@streaming = false
self.stop if !market_is_open
if !@stopping
puts "HTTP ERROR: #{@http.error}"
@http.close
sleep 1
puts 'Reconnecting to Tradeking...'
start_streaming_quotes(@symbols, &block)
else
@stopping = false
end
end
end
def stream(symbols, attempts = 0)
begin
attempts += 1
url = "https://stream.tradeking.com/v1/market/quotes.json?symbols=#{symbols.join(',')}"
@conn = nil
@conn = EventMachine::HttpRequest.new(url)
@conn.use EventMachine::Middleware::OAuth, @account.account_data
puts 'Connected to Tradeking'
return @conn.get
rescue => error
sleep 1
puts "Attempting reconnect..."
return stream(symbols, attempts) if attempts < 10
end
return @conn.get
end
def market_is_open
Time.now.getutc.to_i >= Time.parse("#{Date.today} 14:30:00 UTC").to_i && \
Time.now.getutc.to_i <= Time.parse("#{Date.today} 21:00:00 UTC").to_i && \
![0,6].include?(Date.today.wday)
end
end
end