public
Rubygem
Description: Starling Message Queue - please contribute if you want commit access
Homepage: http://groups.google.com/group/starlingmq
Clone URL: git://github.com/starling/starling.git
starling / lib / starling / server.rb
100644 114 lines (88 sloc) 2.784 kb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
require 'socket'
require 'logger'
require 'rubygems'
require 'eventmachine'
require 'analyzer_tools/syslog_logger'
 
here = File.dirname(__FILE__)
 
require File.join(here, 'queue_collection')
require File.join(here, 'handler')
 
module StarlingServer
 
  VERSION = "0.9.7.9"
  
  class Base
    attr_reader :logger
 
    DEFAULT_HOST = '127.0.0.1'
    DEFAULT_PORT = 22122
    DEFAULT_PATH = "/tmp/starling/"
    DEFAULT_TIMEOUT = 60
 
    ##
    # Initialize a new Starling server and immediately start processing
    # requests.
    #
    # +opts+ is an optional hash, whose valid options are:
    #
    # [:host] Host on which to listen (default is 127.0.0.1).
    # [:port] Port on which to listen (default is 22122).
    # [:path] Path to Starling queue logs. Default is /tmp/starling/
    # [:timeout] Time in seconds to wait before closing connections.
    # [:logger] A Logger object, an IO handle, or a path to the log.
    # [:loglevel] Logger verbosity. Default is Logger::ERROR.
    #
    # Other options are ignored.
 
    def self.start(opts = {})
      server = self.new(opts)
      server.run
    end
 
    ##
    # Initialize a new Starling server, but do not accept connections or
    # process requests.
    #
    # +opts+ is as for +start+
 
    def initialize(opts = {})
      @opts = {
        :host => DEFAULT_HOST,
        :port => DEFAULT_PORT,
        :path => DEFAULT_PATH,
        :timeout => DEFAULT_TIMEOUT,
        :server => self
      }.merge(opts)
 
      @stats = Hash.new(0)
 
      FileUtils.mkdir_p(@opts[:path])
 
    end
 
    ##
    # Start listening and processing requests.
 
    def run
      @stats[:start_time] = Time.now
 
      @@logger = case @opts[:logger]
                 when IO, String; Logger.new(@opts[:logger])
                 when Logger; @opts[:logger]
                 else; Logger.new(STDERR)
                 end
      @@logger = SyslogLogger.new(@opts[:syslog_channel]) if @opts[:syslog_channel]
 
      @opts[:queue] = QueueCollection.new(@opts[:path])
      @@logger.level = @opts[:log_level] || Logger::ERROR
 
      @@logger.info "Starling STARTUP on #{@opts[:host]}:#{@opts[:port]}"
 
      EventMachine.epoll
      EventMachine.set_descriptor_table_size(4096)
      EventMachine.run do
        EventMachine.start_server(@opts[:host], @opts[:port], Handler, @opts)
      end
 
      # code here will get executed on shutdown:
      @opts[:queue].close
    end
 
    def self.logger
      @@logger
    end
 
 
    ##
    # Stop accepting new connections and shutdown gracefully.
 
    def stop
      EventMachine.stop_event_loop
    end
 
    def stats(stat = nil) #:nodoc:
      case stat
      when nil; @stats
      when :connections; 1
      else; @stats[stat]
      end
    end
  end
end