ry / ebb fork watch download tarball
public this repo is viewable by everyone
Description: web server
Homepage: http://ebb.rubyforge.org
Clone URL: git://github.com/ry/ebb.git
Use rb_thread_select instead of rb_thread_schedule

This cleans up the event loop processing greatly. I use rb_thread_select
to watch for changes on every file descriptor before calling ev_loop(loop,

EVLOOP_NONBLOCK); There is a 0.5 second timeout, to return to ruby for
checking @running and signals.

I still need to run benchmarks on this to make sure that it is still fast
:)
Ryan Dahl (author)
2 months ago
commit  26e7c927f672e4a589e50387ec846c4d89a6b81e
tree    df9df0e423eaa18b0e6da4abee54dbe08a9296ba
parent  53f3e7f84e2c6571b6c97d6ccab3dfc4902253f6
...
4
5
6
7
 
8
9
10
...
18
19
20
 
 
 
 
21
22
23
...
41
42
43
44
 
45
46
47
...
4
5
6
 
7
8
9
10
...
18
19
20
21
22
23
24
25
26
27
...
45
46
47
 
48
49
50
51
0
@@ -4,7 +4,7 @@ require 'rake/gempackagetask'
0
 require 'rake/clean'
0
 
0
 COMMON_DISTFILES = FileList.new('src/ebb.{c,h}', 'src/parser.{c,h}',
0
- 'libev/*', 'VERSION', 'README')
0
+ 'libev/*', 'README')
0
 
0
 RUBY_DISTFILES = COMMON_DISTFILES + FileList.new('src/ebb_ruby.c',
0
   'src/extconf.rb', 'ruby_lib/**/*', 'benchmark/*.rb', 'bin/ebb_rails',
0
@@ -18,6 +18,10 @@ CLEAN.add ["**/*.{o,bundle,so,obj,pdb,lib,def,exp}", "benchmark/*.dump",
0
 
0
 CLOBBER.add ['src/Makefile', 'src/parser.c', 'src/mkmf.log', 'build']
0
 
0
+Rake::TestTask.new do |t|
0
+ t.test_files = FileList.new("test/*.rb")
0
+ t.verbose = true
0
+end
0
 
0
 def dir(path)
0
   File.expand_path File.join(File.dirname(__FILE__), path)
0
@@ -41,7 +45,7 @@ end
0
 
0
 task(:wc) { sh "wc -l ruby_lib/*.rb src/ebb*.{c,h}" }
0
 
0
-task(:test => :compile)
0
+task(:test => RUBY_DISTFILES)
0
 Rake::TestTask.new do |t|
0
   t.test_files = 'test/basic_test.rb'
0
   t.verbose = true
...
1
2
3
 
4
5
6
...
17
18
19
20
21
22
23
 
 
 
24
25
26
27
28
29
 
30
31
 
32
33
34
35
36
37
38
39
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40
41
42
...
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
 
 
 
 
 
102
103
104
...
116
117
118
 
 
 
 
119
120
121
...
123
124
125
126
127
 
 
 
 
 
 
 
 
 
 
 
 
 
 
128
129
130
131
 
 
 
 
 
132
133
134
135
 
 
136
137
138
 
139
140
141
...
1
2
3
4
5
6
7
...
18
19
20
 
 
 
21
22
23
24
25
26
27
28
29
 
30
31
 
32
33
34
35
 
 
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
...
98
99
100
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
101
102
103
104
105
106
107
108
109
110
111
112
113
114
...
126
127
128
129
130
131
132
133
134
135
...
137
138
139
 
 
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
 
157
158
159
160
161
162
163
 
 
164
165
166
167
168
169
170
171
172
0
@@ -1,6 +1,7 @@
0
 # Ruby Binding to the Ebb Web Server
0
 # Copyright (c) 2008 Ry Dahl. This software is released under the MIT License.
0
 # See README file for details.
0
+require 'stringio'
0
 module Ebb
0
   LIBDIR = File.dirname(__FILE__)
0
   require Ebb::LIBDIR + '/../src/ebb_ext'
0
@@ -17,26 +18,68 @@ module Ebb
0
     Client::BASE_ENV['rack.multithread'] = threaded_processing
0
     
0
     FFI::server_listen_on_port(port)
0
-
0
- puts "Ebb listening at http://0.0.0.0:#{port}/ (#{threaded_processing ? 'threaded' : 'sequential'} processing)"
0
- trap('INT') { @running = false }
0
     @running = true
0
+ #trap('INT') { stop_server }
0
+
0
+ puts "Ebb listening at http://0.0.0.0:#{port}/ (#{threaded_processing ? 'threaded' : 'sequential'} processing, PID #{Process.pid})"
0
     
0
     while @running
0
       FFI::server_process_connections()
0
       while client = FFI::waiting_clients.shift
0
         if threaded_processing
0
- Thread.new(client) { |c| c.process(app) }
0
+ Thread.new(client) { |c| process(app, c) }
0
         else
0
- client.process(app)
0
+ process(app, client)
0
         end
0
       end
0
     end
0
-
0
- puts "Ebb unlistening"
0
     FFI::server_unlisten()
0
   end
0
   
0
+ def self.running?
0
+ FFI::server_open?
0
+ end
0
+
0
+ def self.stop_server()
0
+ @running = false
0
+ end
0
+
0
+ def self.process(app, client)
0
+ begin
0
+ status, headers, body = app.call(client.env)
0
+ rescue
0
+ raise if $DEBUG
0
+ status = 500
0
+ headers = {'Content-Type' => 'text/plain'}
0
+ body = "Internal Server Error\n"
0
+ end
0
+
0
+ client.write_status(status)
0
+
0
+ if headers.respond_to?(:[]=) and body.respond_to?(:length) and status != 304
0
+ headers['Connection'] = 'close'
0
+ headers['Content-Length'] = body.length.to_s
0
+ end
0
+
0
+ headers.each { |field, value| client.write_header(field, value) }
0
+ client.write("\r\n")
0
+
0
+ if body.kind_of?(String)
0
+ client.write(body)
0
+ client.body_written()
0
+ client.begin_transmission()
0
+ else
0
+ client.begin_transmission()
0
+ client.body.each { |p| write(p) }
0
+ client.body_written()
0
+ end
0
+ rescue => e
0
+ puts "Ebb Error! #{e.class} #{e.message}"
0
+ puts e.backtrace.join("\n")
0
+ ensure
0
+ client.release
0
+ end
0
+
0
   # This array is created and manipulated in the C extension.
0
   def FFI.waiting_clients
0
     @waiting_clients
0
@@ -55,50 +98,17 @@ module Ebb
0
       'rack.run_once' => false
0
     }
0
     
0
- def process(app)
0
- begin
0
- status, headers, body = app.call(env)
0
- rescue
0
- raise if $DEBUG
0
- status = 500
0
- headers = {'Content-Type' => 'text/plain'}
0
- body = "Internal Server Error\n"
0
- end
0
-
0
- status = status.to_i
0
- FFI::client_write_status(self, status, HTTP_STATUS_CODES[status])
0
-
0
- if headers.respond_to?(:[]=) and body.respond_to?(:length) and status != 304
0
- headers['Connection'] = 'close'
0
- headers['Content-Length'] = body.length.to_s
0
- end
0
-
0
- headers.each { |field, value| write_header(field, value) }
0
- write("\r\n")
0
-
0
- if body.kind_of?(String)
0
- write(body)
0
- body_written()
0
- begin_transmission()
0
- else
0
- begin_transmission()
0
- body.each { |p| write(p) }
0
- body_written()
0
- end
0
- rescue => e
0
- puts "Error! #{e.class} #{e.message}"
0
- ensure
0
- FFI::client_release(self)
0
- end
0
-
0
- private
0
-
0
     def env
0
       env = FFI::client_env(self).update(BASE_ENV)
0
       env['rack.input'] = RequestBody.new(self)
0
       env
0
     end
0
     
0
+ def write_status(status)
0
+ s = status.to_i
0
+ FFI::client_write_status(self, s, HTTP_STATUS_CODES[s])
0
+ end
0
+
0
     def write(data)
0
       FFI::client_write(self, data)
0
     end
0
@@ -116,6 +126,10 @@ module Ebb
0
     def begin_transmission
0
       FFI::client_begin_transmission(self)
0
     end
0
+
0
+ def release
0
+ FFI::client_release(self)
0
+ end
0
   end
0
   
0
   class RequestBody
0
@@ -123,19 +137,36 @@ module Ebb
0
       @client = client
0
     end
0
     
0
- def read(len)
0
- FFI::client_read_input(@client, len)
0
+ def read(len = nil)
0
+ if @io
0
+ @io.read(len)
0
+ else
0
+ if len.nil?
0
+ s = ''
0
+ while(chunk = read(10*1024)) do
0
+ s << chunk
0
+ end
0
+ s
0
+ else
0
+ FFI::client_read_input(@client, len)
0
+ end
0
+ end
0
     end
0
     
0
     def gets
0
- raise NotImplementedError
0
+ io.gets
0
+ end
0
+
0
+ def each(&block)
0
+ io.each(&block)
0
     end
0
     
0
- def each
0
- raise NotImplementedError
0
+ def io
0
+ @io ||= StringIO.new(read)
0
     end
0
   end
0
   
0
+
0
   HTTP_STATUS_CODES = {
0
     100 => 'Continue',
0
     101 => 'Switching Protocols',
...
28
29
30
 
31
32
33
...
42
43
44
 
45
46
47
...
51
52
53
54
55
 
 
 
 
 
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
 
 
 
77
78
79
80
81
 
82
83
84
85
86
87
88
 
 
 
 
 
 
 
 
89
90
91
92
93
 
 
 
 
 
 
94
95
96
...
259
260
261
 
262
263
264
...
28
29
30
31
32
33
34
...
43
44
45
46
47
48
49
...
53
54
55
 
 
56
57
58
59
60
61
62
63
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64
65
66
67
 
 
 
 
68
69
 
 
 
 
 
 
70
71
72
73
74
75
76
77
78
 
 
 
 
79
80
81
82
83
84
85
86
87
...
250
251
252
253
254
255
256
0
@@ -28,6 +28,7 @@ static VALUE global_http_host;
0
  */
0
 static ebb_server *server;
0
 struct ev_loop *loop;
0
+static unsigned int client_count = 0;
0
 
0
 /* Variables with a leading underscore are C-level variables */
0
 
0
@@ -42,6 +43,7 @@ void request_cb(ebb_client *client, void *data)
0
   VALUE waiting_clients = (VALUE)data;
0
   VALUE rb_client = Data_Wrap_Struct(cClient, 0, 0, client);
0
   rb_ary_push(waiting_clients, rb_client);
0
+ client_count++;
0
 }
0
 
0
 VALUE server_listen_on_port(VALUE _, VALUE port)
0
@@ -51,46 +53,35 @@ VALUE server_listen_on_port(VALUE _, VALUE port)
0
   return Qnil;
0
 }
0
 
0
-static void
0
-oneshot_timeout (struct ev_loop *loop, struct ev_timer *w, int revents) {;}
0
+VALUE server_open(VALUE _)
0
+{
0
+ return server->open ? Qtrue : Qfalse;
0
+}
0
+
0
 
0
 VALUE server_process_connections(VALUE _)
0
 {
0
- /* This function is super hacky. The libev loop is called for one iteration
0
- * this means that any pending events are handled. If no events exist then
0
- * the function blocks. We want blocking so that the while loop in ruby
0
- * doesn't race away - however there is a need to continue to process other
0
- * ruby threads which are running. While this function is being called
0
- * other ruby threads cannot execute.
0
- * So we set this timeout event which breaks the block after 0.1 seconds.
0
- * Additionally we make sure that other threads get enough processing time
0
- * by calling rb_thread_schedule() many times.
0
- *
0
- * Instead we should probably use rb_thread_select on server->fd when no
0
- * clients are in_use? Whatever happens here, one should make sure the
0
- * 'wait' benchmark is running as quickly with Ebb as it does with mongrel.
0
- */
0
- ev_timer timeout;
0
- ev_timer_init (&timeout, oneshot_timeout, 0.1, 0.);
0
- ev_timer_start (loop, &timeout);
0
- ev_loop(loop, EVLOOP_ONESHOT);
0
+ int fd_count = 0, max_fd = 0;
0
+ struct timeval tv = { tv_sec: 0, tv_usec: 500000 };
0
+ int i;
0
   
0
- /* remove the timeout event so that it isn't called immediately the next
0
- * time around (since 0.1 seconds will have passed)
0
- */
0
- ev_timer_stop(loop, &timeout);
0
+ fd_set fds; FD_ZERO(&fds);
0
   
0
- /* Call rb_thread_schedule() proportional to the number of rb threads running */
0
- /* SO HACKY! Anyone have a better way to do this? */
0
- int i;
0
- for(i = 0; i < EBB_MAX_CLIENTS; i++)
0
- if(server->clients[i].in_use)
0
- rb_thread_schedule();
0
+ FD_SET(server->fd, &fds); fd_count++;
0
+ for(i = 0; i < EBB_MAX_CLIENTS; i++) {
0
+ ebb_client *client = &server->clients[i];
0
+ if(client->open) {
0
+ FD_SET(client->fd, &fds); fd_count++;
0
+ if(client->fd > max_fd) max_fd = client->fd;
0
+ }
0
+ }
0
   
0
- if(server->open)
0
- return Qtrue;
0
- else
0
- return Qfalse;
0
+ unsigned int last_client_count = client_count;
0
+ ev_loop(loop, EVLOOP_NONBLOCK);
0
+ if(last_client_count == client_count) {
0
+ rb_thread_select(max_fd+1, &fds, &fds, &fds, &tv);
0
+ ev_loop(loop, EVLOOP_NONBLOCK);
0
+ }
0
 }
0
 
0
 
0
@@ -259,6 +250,7 @@ void Init_ebb_ext()
0
   rb_define_singleton_method(mFFI, "server_process_connections", server_process_connections, 0);
0
   rb_define_singleton_method(mFFI, "server_listen_on_port", server_listen_on_port, 1);
0
   rb_define_singleton_method(mFFI, "server_unlisten", server_unlisten, 0);
0
+ rb_define_singleton_method(mFFI, "server_open?", server_open, 0);
0
   
0
   cClient = rb_define_class_under(mEbb, "Client", rb_cObject);
0
   rb_define_singleton_method(mFFI, "client_read_input", client_read_input, 2);
...
1
2
3
4
5
6
 
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
 
68
69
70
...
84
85
86
87
88
89
90
91
92
93
94
95
96
...
98
99
100
101
102
103
104
105
106
107
108
109
110
...
 
 
 
 
 
 
1
2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3
4
5
6
...
20
21
22
 
 
 
 
 
 
 
23
24
25
...
27
28
29
 
 
 
 
 
 
 
 
 
30
0
@@ -1,70 +1,6 @@
0
-require File.dirname(__FILE__) + '/../ruby_lib/ebb'
0
-require 'test/unit'
0
-require 'net/http'
0
-require 'socket'
0
-require 'rubygems'
0
-require 'json'
0
+require File.dirname(__FILE__) + '/helper'
0
 
0
-PORT = 4044
0
-
0
-class EbbTest < Test::Unit::TestCase
0
- def setup
0
- @pid = fork do
0
- STDOUT.reopen "/dev/null", "a"
0
- server = Ebb::start_server(self, :port => PORT)
0
- end
0
- sleep 0.5
0
- end
0
-
0
- def teardown
0
- Process.kill('KILL', @pid)
0
- sleep 0.5
0
- end
0
-
0
- def get(path)
0
- Net::HTTP.get_response(URI.parse("http://0.0.0.0:#{PORT}#{path}"))
0
- end
0
-
0
- def post(path, data)
0
- Net::HTTP.post_form(URI.parse("http://0.0.0.0:#{PORT}#{path}"), data)
0
- end
0
-
0
- @@responses = {}
0
- def call(env)
0
- commands = env['PATH_INFO'].split('/')
0
-
0
- if commands.include?('bytes')
0
- n = commands.last.to_i
0
- raise "bytes called with n <= 0" if n <= 0
0
- body = @@responses[n] || "C"*n
0
- status = 200
0
-
0
- elsif commands.include?('test_post_length')
0
- input_body = ""
0
- while chunk = env['rack.input'].read(512)
0
- input_body << chunk
0
- end
0
-
0
- content_length_header = env['HTTP_CONTENT_LENGTH'].to_i
0
-
0
- if content_length_header == input_body.length
0
- body = "Content-Length matches input length"
0
- status = 200
0
- else
0
- body = "Content-Length header is #{content_length_header} but body length is #{input_body.length}"
0
- # content_length = #{env['HTTP_CONTENT_LENGTH'].to_i}
0
- # input_body.length = #{input_body.length}"
0
- status = 500
0
- end
0
-
0
- else
0
- status = 404
0
- body = "Undefined url"
0
- end
0
-
0
- [status, {'Content-Type' => 'text/plain'}, body]
0
- end
0
-
0
+class BasicTest < ServerTest
0
   def test_get_bytes
0
     [1,10,1000].each do |i|
0
       response = get("/bytes/#{i}")
0
@@ -84,13 +20,6 @@ class EbbTest < Test::Unit::TestCase
0
     end
0
   end
0
   
0
- # this is rough but does detect major problems
0
- def test_ab
0
- r = %x{ab -n 1000 -c 50 -q http://0.0.0.0:#{PORT}/bytes/123}
0
- assert r =~ /Requests per second:\s*(\d+)/, r
0
- assert $1.to_i > 100, r
0
- end
0
-
0
   def test_large_post
0
     [50,60,100].each do |i|
0
       response = post("/test_post_length", 'C'*1024*i)
0
@@ -98,12 +27,3 @@ class EbbTest < Test::Unit::TestCase
0
     end
0
   end
0
 end
0
-
0
-
0
-class EbbRailsTest < Test::Unit::TestCase
0
- # just to make sure there isn't some load error
0
- def test_ebb_rails_version
0
- out = %x{ruby #{Ebb::LIBDIR}/../bin/ebb_rails -v}
0
- assert_match %r{Ebb #{Ebb::VERSION}}, out
0
- end
0
-end
0
\ No newline at end of file
...
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
0
@@ -0,0 +1,63 @@
0
+require 'rubygems'
0
+require File.dirname(__FILE__) + '/../ruby_lib/ebb'
0
+require 'test/unit'
0
+require 'net/http'
0
+require 'socket'
0
+require 'rubygems'
0
+require 'json'
0
+
0
+include Ebb
0
+
0
+TEST_PORT = 4044
0
+
0
+def get(path)
0
+ Net::HTTP.get_response(URI.parse("http://0.0.0.0:#{TEST_PORT}#{path}"))
0
+end
0
+
0
+def post(path, data)
0
+ Net::HTTP.post_form(URI.parse("http://0.0.0.0:#{TEST_PORT}#{path}"), data)
0
+end
0
+
0
+class HelperApp
0
+ def call(env)
0
+ commands = env['PATH_INFO'].split('/')
0
+
0
+ if commands.include?('bytes')
0
+ n = commands.last.to_i
0
+ raise "bytes called with n <= 0" if n <= 0
0
+ body = "C"*n
0
+ status = 200
0
+
0
+ elsif commands.include?('test_post_length')
0
+ input_body = env['rack.input'].read
0
+
0
+ content_length_header = env['HTTP_CONTENT_LENGTH'].to_i
0
+
0
+ if content_length_header == input_body.length
0
+ body = "Content-Length matches input length"
0
+ status = 200
0
+ else
0
+ body = "Content-Length header is #{content_length_header} but body length is #{input_body.length}"
0
+ status = 500
0
+ end
0
+
0
+ else
0
+ status = 404
0
+ body = "Undefined url"
0
+ end
0
+
0
+ [status, {'Content-Type' => 'text/plain'}, body]
0
+ end
0
+end
0
+
0
+class ServerTest < Test::Unit::TestCase
0
+ def setup
0
+ Thread.new { Ebb.start_server(HelperApp.new, :port => TEST_PORT) }
0
+ sleep 0.1 until Ebb.running?
0
+ end
0
+
0
+ def teardown
0
+ Ebb.stop_server
0
+ sleep 0.1 while Ebb.running?
0
+ end
0
+end

Comments

    No one has commented yet.