Skip to content

Commit

Permalink
Switching from one Streaming API connection per user to one API conne…
Browse files Browse the repository at this point in the history
…ction for whole users.
  • Loading branch information
gimite committed Oct 9, 2010
1 parent 7f23da4 commit 0bb98b5
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 43 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ tss_config.rb
data
exp
log
script
6 changes: 1 addition & 5 deletions lib/tss_web_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,7 @@ class TSSWebServer < Sinatra::Base
get("/buzz") do
result = []
for (lang_id, lang_name) in [["en", "English"], ["ja", "Japanese"]]
words = get_buzz_words(lang_id)
if lang_id == "ja"
words = words.grep(/^\#/)
end
words = words[0, 10]
words = get_buzz_words(lang_id).grep(/^\#[a-zA-Z0-9_]+$/)[0, 10]
result.push({"lang_id" => lang_id, "lang_name" => lang_name, "words" => words})
end
content_type("text/javascript", :charset => "utf-8")
Expand Down
128 changes: 90 additions & 38 deletions lib/tss_web_socket_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
require "open-uri"
require "logger"
require "uri"
require "thread"
require "timeout"

require "rubygems"
require "json"
Expand All @@ -31,6 +33,9 @@ class TSSWebSocketServer

def initialize(logger = nil, test_only = false)
@logger = logger || Logger.new(STDERR)
@mutex = Mutex.new()
@query_to_wsocks = {}
@stream_thread = nil
if !test_only
params = {
:accepted_domains => [URI.parse(TSSConfig::BASE_URL).host],
Expand All @@ -42,6 +47,8 @@ def initialize(logger = nil, test_only = false)
end

def run()
@warm_up = true
Thread.new(){ sleep(60); @warm_up = false }
@server.run() do |ws|
@logger.info("Connection accepted: #{ws.object_id}")
@logger.info("Path: #{ws.path}, Origin: #{ws.origin}")
Expand All @@ -58,57 +65,98 @@ def run()
raise("session_id missing") if !session_id
session = Session.get(session_id)
auth_params = {
:oauth_access_token => session[:access_token],
:oauth_access_token_secret => session[:access_token_secret],
:oauth_access_token => TSSConfig::TEST_ACCESS_TOKEN,
:oauth_access_token_secret => TSSConfig::TEST_ACCESS_TOKEN_SECRET
}
query = params["q"][0]
twitter_thread = ws_thread = nil
twitter_thread = Thread.new() do
begin
res = search(query, auth_params)
if res["results"]
entries = res["results"].reverse()
convert_entries(entries)
send(ws, {"entries" => entries})
# For Streaming API, directly modifies JSON instead of parsing JSON and converting
# back to JSON for efficiency.
get_search_stream(query, auth_params) do |json|
json.slice!(json.length - 1, 1) # Deletes last '}'
s = '{"entries": [%s,"now":%f}]}' % [json, Time.now.to_f()]
send_raw(ws, s)
query = params["q"][0].downcase
stream_enabled = false
res = search(query, auth_params)
if res["results"]
entries = res["results"].reverse()
convert_entries(entries)
send(ws, {"entries" => entries})
if query =~ /\A\#[a-z0-9_]+\z/
@mutex.synchronize() do
if @stream_thread && @stream_thread.alive? && @query_to_wsocks.has_key?(query)
@query_to_wsocks[query].push(ws)
@logger.info("query_to_wsocks = %p" % [@query_to_wsocks.map(){ |k, v| [k, v.size] }])
else
if @stream_thread && !@warm_up
Thread.new() do
@stream_thread.kill() rescue nil
end
end
@query_to_wsocks[query] ||= []
@query_to_wsocks[query].push(ws)
@logger.info("query_to_wsocks = %p" % [@query_to_wsocks.map(){ |k, v| [k, v.size] }])
queries = @query_to_wsocks.keys
if @warm_up
@logger.info("warming up, stream pending")
else
@stream_thread = Thread.new(){ stream_thread_main(queries, auth_params) }
end
end
else
send(ws, {"error" => res["error"]})
end
rescue => ex
print_backtrace(ex)
stream_enabled = true
else
send(ws, {"error" => "Auto update works only for hash tags."})
end
@logger.info("Disconnected by Twitter: #{ws.object_id}")
while !ws_thread; end
# ws.close_socket() here doesn't unblock ws.receive() in Ruby 1.9.
# I don't use ws.close() here either because it may not be supported by WebSocket
# client implementation based on old protocol.
ws_thread.kill()
else
send(ws, {"error" => res["error"]})
end
begin
while ws.receive()
end
rescue => ex
end
ws_thread = Thread.new() do
begin
while ws.receive()
@logger.info("Disconnected by user: #{ws.object_id}")
if stream_enabled
@mutex.synchronize() do
@query_to_wsocks[query].delete(ws)
if @query_to_wsocks[query].empty?
@query_to_wsocks.delete(query)
end
rescue => ex
@logger.info("query_to_wsocks = %p" % [@query_to_wsocks.map(){ |k, v| [k, v.size] }])
end
@logger.info("Disconnected by user: #{ws.object_id}")
while !twitter_thread; end
twitter_thread.kill()
end
twitter_thread.join()
ws_thread.join()
else
ws.handshake("404 Not Found")
end
@logger.info("Connection closed: #{ws.object_id}")
end
end

def stream_thread_main(queries, auth_params)
begin
api_query = queries.join(",")
@logger.info("stream_thread_begin: %s" % api_query)
get_search_stream(api_query, auth_params) do |json|
if json =~ /"text":"(([^"\\]|\\.)*)"/
text = $1.downcase
json.slice!(json.length - 1, 1) # Deletes last '}'
s = '{"entries": [%s,"now":%f}]}' % [json, Time.now.to_f()]
# Uses mutex here, otherwise it may cause exception that adding key during iteration.
@mutex.synchronize() do
for query, wsocks in @query_to_wsocks
if text.index(query)
for wsock in wsocks
send_raw(wsock, s)
end
end
end
end
end
end
rescue => ex
print_backtrace(ex)
ensure
@logger.info("stream_thread_end: %s" % api_query)
end
@logger.info("start warm up")
@warm_up = true
Thread.new(){ sleep(60); @warm_up = false }
end

def get_search_stream(query, auth_params, &block)
buffer = ""
#url = URI.parse("http://192.168.1.7:12000/")
Expand All @@ -129,7 +177,7 @@ def get_search_stream(query, auth_params, &block)
end
end
else
raise(res.to_s())
raise("%s %s" % [res.to_s(), res.body])
end
end
end
Expand Down Expand Up @@ -220,7 +268,11 @@ def print_data(data)

def send_raw(ws, str)
begin
ws.send(str)
Timeout.timeout(1) do
ws.send(str)
end
rescue Timeout::Error => ex
print_backtrace(ex)
rescue => ex
print_backtrace(ex)
end
Expand Down

0 comments on commit 0bb98b5

Please sign in to comment.