Skip to content
This repository
Browse code

Wader now raises exceptions on certain classes of fatal http response…

…s for invalid authentication, unknown streams and invalid parameters.
  • Loading branch information...
commit e0ac8b661dc87c530006a064c8b238ed40a226ce 1 parent 2a551de
Hayes Davis authored
51 docs/NOTES.md
Source Rendered
@@ -32,4 +32,53 @@ Capabilities
32 32 Event Dispatch
33 33 --------------
34 34 * How can it generically handle dispatching events?
35   -** Could route event/tweet to particular queue using resque given configurable queue and class name
  35 +** Could route event/tweet to particular queue using resque given configurable queue and class name
  36 +
  37 +Streaming API Scenarios
  38 +-----------------------
  39 +* No connection at all - nobody is home
  40 + * Retry maximum number of times, then give up
  41 +* Various disconnections (clean or otherwise)
  42 + * Retry up to max times, then give up
  43 +* Error codes: See http://dev.twitter.com/pages/streaming_api_response_codes
  44 + * 401 and 403 - Auth issues
  45 + * Likely to happen at startup
  46 + * Should probably stop the entire flamingod with a message
  47 + * 404 - Not found
  48 + * Likely an invalid path
  49 + * Should probably stop the entire flamingod
  50 + * 406 - Not acceptable
  51 + * Could happen due to lack of params. If there are no params, we should
  52 + go into a waiting state to wait for more params.
  53 + * If we get this and we do have params, it's a big deal because it impacts
  54 + our connectivity to twitter. Need to do something to alert the user.
  55 + * 413 - Parameters too long, outside of counts for role
  56 + * A big deal, it means we can't connect to Twitter. Needs immediate fixing.
  57 + * User should be alerted in some way
  58 + * 416 - Unacceptable range, outside of values for role
  59 + * Same as 406, 413
  60 + * 500 - Server error
  61 + * Probably should wait and retry after some period
  62 + * 503 - Overloaded
  63 + * Probably should wait and retry after some (longish) period
  64 +
  65 +Fatal Error Classes:
  66 +* Authentication: 401, 403
  67 +* Invalid Stream: 404
  68 +* Invalid Parameters: 406, 413, 416
  69 +
  70 +Retry-able Transient Error Classes:
  71 +* Server unavailable
  72 +* Connection closed
  73 +* Server HTTP Errors: 5XX
  74 +
  75 +Should Introduce a Connectivity Status
  76 +--------------------------------------
  77 +Flamingo:Stream:Status
  78 + * Connecting - The wader is in the process of connecting to the server
  79 + * Connected - The wader is connected and receiving tweets
  80 + * Disconnected-Retry - The wader is not connected but is trying
  81 + * Disconnected-Fatal - The wader is disconnected and can't get reconnected
  82 +
  83 +Flamingo:Stream:Status:Message
  84 + * String describing what happened
35 lib/flamingo/daemon/flamingod.rb
@@ -58,14 +58,25 @@ def trap_signals
58 58 end
59 59
60 60 def restart_wader
61   - Flamingo.logger.info "Flamingod restarting wader pid=#{@wader.pid} with SIGINT"
62   - @wader.kill("INT")
  61 + if @wader
  62 + Flamingo.logger.info "Flamingod restarting wader pid=#{@wader.pid} with SIGINT"
  63 + @wader.kill("INT")
  64 + else
  65 + Flamingo.logger.info "Wader is not started. Attempting to start new wader."
  66 + @wader = start_new_wader
  67 + end
63 68 end
64 69
65 70 def signal_children(sig)
66 71 pids = (children.map {|c| c.pid}).join(",")
67 72 Flamingo.logger.info "Flamingod sending SIG#{sig} to pids=#{pids}"
68   - children.each {|child| child.signal(sig) }
  73 + children.each do |child|
  74 + begin
  75 + child.signal(sig)
  76 + rescue => e
  77 + Flamingo.logger.info "Failure sending SIG#{sig} to child #{child.pid}: #{e}"
  78 + end
  79 + end
69 80 end
70 81
71 82 def terminate!
@@ -75,7 +86,7 @@ def terminate!
75 86 end
76 87
77 88 def children
78   - [@wader,@web_server] + @dispatchers
  89 + ([@wader,@web_server] + @dispatchers).compact
79 90 end
80 91
81 92 def start_children
@@ -92,9 +103,10 @@ def start_children
92 103 def wait_on_children()
93 104 until exit_signaled?
94 105 child_pid = Process.wait(-1)
  106 + child_status = $?
95 107 unless exit_signaled?
96   - if @wader.pid == child_pid
97   - @wader = start_new_wader
  108 + if @wader && @wader.pid == child_pid
  109 + handle_wader_exit(child_status)
98 110 elsif @web_server.pid == child_pid
99 111 @web_server = start_new_web_server
100 112 elsif (to_delete = @dispatchers.find{|d| d.pid == child_pid})
@@ -106,6 +118,17 @@ def wait_on_children()
106 118 end
107 119 end
108 120 end
  121 +
  122 + def handle_wader_exit(status)
  123 + if WaderProcess.fatal_exit?(status)
  124 + Flamingo.logger.error "Wader exited with status "+
  125 + "#{status.exitstatus} and cannot be automatically restarted"
  126 + $stderr.write("Wader exited with fatal error. Check the the log.")
  127 + terminate!
  128 + else
  129 + @wader = start_new_wader
  130 + end
  131 + end
109 132
110 133 def run_as_daemon
111 134 pid_file = PidFile.new
32 lib/flamingo/daemon/wader_process.rb
... ... @@ -1,6 +1,23 @@
1 1 module Flamingo
2 2 module Daemon
3 3 class WaderProcess < ChildProcess
  4 +
  5 + EXIT_CLEAN = 0
  6 +
  7 + EXIT_FATAL_RANGE = 100..199
  8 + EXIT_AUTHENTICATION = 100
  9 + EXIT_INVALID_PARAMS = 101
  10 + EXIT_UNKNOWN_STREAM = 102
  11 +
  12 +
  13 + class << self
  14 + def fatal_exit?(status)
  15 + if status
  16 + EXIT_FATAL_RANGE.include?(status.exitstatus)
  17 + end
  18 + end
  19 + end
  20 +
4 21 def register_signal_handlers
5 22 trap("INT") { stop }
6 23 end
@@ -16,8 +33,19 @@ def run
16 33
17 34 @wader = Flamingo::Wader.new(screen_name,password,stream)
18 35 Flamingo.logger.info "Starting wader on pid=#{Process.pid} under pid=#{Process.ppid}"
19   - @wader.run
20   - Flamingo.logger.info "Wader pid=#{Process.pid} stopped"
  36 +
  37 + exit_code = EXIT_CLEAN
  38 + begin
  39 + @wader.run
  40 + rescue Flamingo::Wader::AuthenticationError
  41 + exit_code = EXIT_AUTHENTICATION
  42 + rescue Flamingo::Wader::UnknownStreamError
  43 + exit_code = EXIT_UNKNOWN_STREAM
  44 + rescue Flamingo::Wader::InvalidParametersError
  45 + exit_code = EXIT_INVALID_PARAMS
  46 + end
  47 + Flamingo.logger.info "Wader pid=#{Process.pid} stopped with code #{exit_code}"
  48 + exit(exit_code)
21 49 end
22 50
23 51 def stop
37 lib/flamingo/wader.rb
... ... @@ -1,5 +1,23 @@
1 1 module Flamingo
2 2 class Wader
  3 +
  4 + class FatalError < StandardError
  5 + end
  6 +
  7 + class FatalHttpStatusError < FatalError
  8 +
  9 + attr_accessor :code
  10 +
  11 + def initialize(message,code)
  12 + super(message)
  13 + self.code = code
  14 + end
  15 + end
  16 +
  17 + class AuthenticationError < FatalHttpStatusError; end
  18 + class UnknownStreamError < FatalHttpStatusError; end
  19 + class InvalidParametersError < FatalHttpStatusError; end
  20 +
3 21 attr_accessor :screen_name, :password, :stream, :connection
4 22
5 23 def initialize(screen_name,password,stream)
@@ -25,7 +43,16 @@ def run
25 43 end
26 44
27 45 connection.on_error do |message|
28   - dispatch_error(:generic,message)
  46 + code = connection.code
  47 + if [401,403].include?(code)
  48 + stop_and_raise!(AuthenticationError.new(message,code))
  49 + elsif code == 404
  50 + stop_and_raise!(UnknownStreamError.new(message,code))
  51 + elsif [406,413,416].include?(code)
  52 + stop_and_raise!(InvalidParametersError.new(message,code))
  53 + else
  54 + dispatch_error(:generic,message)
  55 + end
29 56 end
30 57
31 58 connection.on_reconnect do |timeout, retries|
@@ -45,6 +72,7 @@ def run
45 72 stop
46 73 end
47 74 end
  75 + raise @error if @error
48 76 end
49 77
50 78 def stop
@@ -53,15 +81,20 @@ def stop
53 81 end
54 82
55 83 private
  84 + def stop_and_raise!(error)
  85 + stop
  86 + @error = error
  87 + end
  88 +
56 89 def dispatch_event(event_json)
57 90 Flamingo.logger.debug "Wader dispatched event"
58 91 Resque.enqueue(Flamingo::DispatchEvent, event_json)
59   - # Resque.enqueue(Flamingo::DispatchEvent, {:src => "flamingo:#{stream.name}"}, event_json)
60 92 end
61 93
62 94 def dispatch_error(type,message,data={})
63 95 Flamingo.logger.error "Received error: #{message}"
64 96 Resque.enqueue(Flamingo::DispatchError, type, message, data)
65 97 end
  98 +
66 99 end
67 100 end
7 test/test_helper.rb
... ... @@ -0,0 +1,7 @@
  1 +require 'rubygems'
  2 +require 'test/unit'
  3 +require 'mockingbird'
  4 +#require "/Users/hayesdavis/Appozite/Projects/mockingbird/lib/mockingbird"
  5 +
  6 +$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__) + '/../lib')
  7 +require "flamingo"
30 test/wader/helper.rb
... ... @@ -0,0 +1,30 @@
  1 +require "test/unit"
  2 +require "#{File.dirname(__FILE__)}/../test_helper"
  3 +
  4 +class MockStream < Flamingo::Stream
  5 +
  6 + def initialize()
  7 + super(:filter,Flamingo::StreamParams.new(:filter))
  8 + end
  9 +
  10 + def connect(opts={})
  11 + opts = opts.merge({:host=>'localhost',:port=>8080})
  12 + Twitter::JSONStream.connect(opts)
  13 + end
  14 +
  15 +end
  16 +
  17 +# This is a little odd but turns out to be a reasonably good way to test the
  18 +# wader. All the wader ever does is enqueue jobs for further processing so
  19 +# we can monitor its behavior by seeing what jobs it enqueues
  20 +module Resque
  21 +
  22 + def after_enqueue(&block)
  23 + @handler = block
  24 + end
  25 +
  26 + def enqueue(*args)
  27 + @handler.call(*args) if @handler
  28 + end
  29 +
  30 +end
68 test/wader/test_fatal_http_responses.rb
... ... @@ -0,0 +1,68 @@
  1 +require "#{File.dirname(__FILE__)}/helper"
  2 +
  3 +class TestAuthentication < Test::Unit::TestCase
  4 +
  5 + def setup
  6 + Flamingo.config = Flamingo::Config.new
  7 + Flamingo.logger = Logger.new("/dev/null")
  8 + end
  9 +
  10 + def test_unauthorized_is_fatal
  11 + run_test_for_status_code(401,"Unauthorized",
  12 + Flamingo::Wader::AuthenticationError)
  13 + end
  14 +
  15 + def test_forbidden_is_fatal
  16 + run_test_for_status_code(403,"Forbidden",
  17 + Flamingo::Wader::AuthenticationError)
  18 + end
  19 +
  20 + def test_unknown_is_fatal
  21 + run_test_for_status_code(404,"Unknown",
  22 + Flamingo::Wader::UnknownStreamError)
  23 + end
  24 +
  25 + def test_not_acceptable_is_fatal
  26 + run_test_for_status_code(406,"Not Acceptable",
  27 + Flamingo::Wader::InvalidParametersError)
  28 + end
  29 +
  30 + def test_too_long_is_fatal
  31 + run_test_for_status_code(413,"Too Long",
  32 + Flamingo::Wader::InvalidParametersError)
  33 + end
  34 +
  35 + def test_range_unacceptable_is_fatal
  36 + run_test_for_status_code(416,"Range Unacceptable",
  37 + Flamingo::Wader::InvalidParametersError)
  38 + end
  39 +
  40 + private
  41 + def run_test_for_status_code(code,message,error_type)
  42 + Mockingbird.setup(:port=>8080) do
  43 + status code, message
  44 + end
  45 +
  46 + error = nil
  47 +
  48 + wader = Flamingo::Wader.new('user','pass',MockStream.new)
  49 +
  50 + Resque.after_enqueue do |job,*args|
  51 + fail("Shouldn't enqueue anything")
  52 + end
  53 +
  54 + begin
  55 + wader.run
  56 + rescue => e
  57 + error = e
  58 + assert_equal(error_type,e.class)
  59 + assert_equal(code,e.code)
  60 + end
  61 +
  62 + assert_not_nil(error)
  63 +
  64 + ensure
  65 + Mockingbird.teardown
  66 + end
  67 +
  68 +end

0 comments on commit e0ac8b6

Please sign in to comment.
Something went wrong with that request. Please try again.