public
Description: New and ultra-turbo-crazy-fast backend for Thin
Homepage: http://code.macournoyer.com/thin/
Clone URL: git://github.com/macournoyer/thin-turbo.git
macournoyer (author)
Wed Apr 30 20:53:03 -0700 2008
commit  edab8a1dbcfcd9c70ab912423a69a5f5767dece6
tree    6da3857a454eeb9d7e6463d51d9c6e3234196609
parent  c1e069fc34f4a5964770375db8b938f4126aa435
thin-turbo / ext / thin_backend / connection.c
100644 190 lines (142 sloc) 4.446 kb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#include "thin.h"
 
/* event callbacks */
 
static void connection_writable_cb(EV_P_ struct ev_io *watcher, int revents)
{
  connection_t *c = get_ev_data(connection, watcher, write);
  int sent;
  
  if (c->write_buffer.len == 0)
    return;
  
  sent = send(c->fd,
              (char *) c->write_buffer.ptr + c->write_buffer.offset,
              c->write_buffer.len - c->write_buffer.offset,
              0);
  
  ev_timer_again(c->loop, &c->timeout_watcher);
  
  if (sent >= 0) {
    c->write_buffer.offset += sent;
    
  } else { /* sent < 0 => error */
    connection_errno(c);
    return;
    
  }
  
  if (buffer_eof(&c->write_buffer)) {
    /* if all the buffer is written we can clear it from memory */
    buffer_reset(&c->write_buffer);
    
    /* we can stream the request, so do do close it unless it's marked */
    if (c->finished)
      connection_close(c);
  }
}
 
void connection_watch_writable(connection_t *c)
{
  ev_io_start(c->loop, &c->write_watcher);
}
 
static void connection_readable_cb(EV_P_ struct ev_io *watcher, int revents)
{
  connection_t *c = get_ev_data(connection, watcher, read);
  size_t n;
  char buf[BUFFER_CHUNK_SIZE];
  
  n = recv(c->fd, buf, BUFFER_CHUNK_SIZE, 0);
  ev_timer_again(c->loop, &c->timeout_watcher);
  
  if (n == -1) {
    /* error, closing connection */
    connection_errno(c);
    return;
  }
  
  if (n == 0) {
    /* received 0 byte, read again next loop */
    return;
  }
  
  request_parse(c, buf, n);
}
 
static void connection_timeout_cb(EV_P_ struct ev_timer *watcher, int revents)
{
  connection_t *c = get_ev_data(connection, watcher, timeout);
    
  connection_close(c);
}
 
 
/* public api */
 
void connection_start(backend_t *backend, int fd, struct sockaddr_in remote_addr)
{
  connection_t *c = (connection_t *) queue_pop(&backend->connections);
  
  /* no free connection found, add more */
  if (c == NULL) {
    connections_push(backend);
    c = (connection_t *) queue_pop(&backend->connections);
  }
  
  assert(c != NULL);
  
  /* init connection */
  c->finished = 0;
  c->loop = backend->loop;
  c->backend = backend;
  c->content_length = 0;
  c->fd = fd;
  c->remote_addr = inet_ntoa(remote_addr.sin_addr);
  
  /* mark as used to Ruby GC */
  c->env = rb_hash_new();
  rb_gc_register_address(&c->env);
  
  /* reset buffers */
  buffer_reset(&c->read_buffer);
  buffer_reset(&c->write_buffer);
  
  /* reinit parser */
  http_parser_init(&c->parser);
  c->parser.data = c;
  
  /* init libev stuff */
  c->read_watcher.data = c;
  c->write_watcher.data = c;
  c->timeout_watcher.data = c;
  ev_io_init(&c->read_watcher, connection_readable_cb, c->fd, EV_READ);
  ev_io_init(&c->write_watcher, connection_writable_cb, c->fd, EV_WRITE);
  ev_timer_init(&c->timeout_watcher, connection_timeout_cb, backend->timeout, backend->timeout);
  
  /* start event watchers */
  ev_timer_start(c->loop, &c->timeout_watcher);
  ev_io_start(c->loop, &c->read_watcher);
}
 
void connection_error(connection_t *c, const char *msg)
{
  log_error(c->backend, msg);
  connection_close(c);
}
 
void connection_errno(connection_t *c)
{
  connection_error(c, strerror(errno));
}
 
void connection_close(connection_t *c)
{
  ev_io_stop(c->loop, &c->read_watcher);
  ev_io_stop(c->loop, &c->write_watcher);
  ev_timer_stop(c->loop, &c->timeout_watcher);
 
  close(c->fd);
  
  buffer_reset(&c->read_buffer);
  buffer_reset(&c->write_buffer);
  
  /* tell Ruby GC vars are not used anymore */
  rb_gc_unregister_address(&c->env);
  rb_gc_unregister_address(&c->input);
  
  /* TODO maybe kill the thread also: rb_thread_kill(c->thread) */
  
  /* put back in the queue of unused connections */
  queue_push(&c->backend->connections, c);
}
 
 
/* connections */
 
void connections_init(backend_t *backend)
{
  queue_init(&backend->connections);
  connections_push(backend);
}
 
void connections_push(backend_t *backend)
{
  size_t i;
  connection_t *c;
  
  for (i = 0; i < CONNECTIONS_SIZE; ++i) {
    c = (connection_t *) malloc(sizeof(connection_t));
    assert(c);
    
    parser_callbacks_setup(c);
    
    buffer_init(&c->read_buffer);
    buffer_init(&c->write_buffer);
    
    queue_push(&backend->connections, c);
  }
}
 
void connections_free(backend_t *backend)
{
  connection_t *c = queue_pop(&backend->connections);
  
  while (c != NULL) {
    free(c);
    c = queue_pop(&backend->connections);
  }
}