public
Description: New and ultra-turbo-crazy-fast backend for Thin
Homepage: http://code.macournoyer.com/thin/
Clone URL: git://github.com/macournoyer/thin-turbo.git
macournoyer (author)
Fri May 09 13:40:27 -0700 2008
commit  ba8f86c952e337db6e5f079337a1cd7c092151cf
tree    c348a99946698eafbad28868cbb754f4c6e4e470
parent  edab8a1dbcfcd9c70ab912423a69a5f5767dece6
thin-turbo / ext / thin_backend / response.c
100644 148 lines (109 sloc) 4.287 kb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#include "thin.h"
 
static void response_send_chunk(connection_t *c, const char *ptr, size_t len)
{
  /* chunk too big, split it in smaller chunks and send each separately */
  if (len >= BUFFER_MAX_LEN) {
    size_t i, size = BUFFER_MAX_LEN - 1, slices = len / size + 1;
    
    for (i = 0; i < slices; ++i) {
      if (i == slices - 1)
        size = len % size;
      
      response_send_chunk(c, (char *) ptr + i * size, size);
    }
    
    return;
  }
  
  /* if appending will overflow the buffer we wait till more is sent */
  while (c->write_buffer.len + len > BUFFER_MAX_LEN)
    ev_loop(c->loop, EVLOOP_ONESHOT);
  
  buffer_append(&c->write_buffer, ptr, len);
  
  /* If we have a good sized chunk of data to send, try to send it right away.
* This allows streaming by going for a shot in the even loop to drain the buffer if possisble,
this way, the chunk is sent if the socket is writable.*/
  if (c->write_buffer.len - c->write_buffer.offset >= STREAM_SIZE) {
    ev_loop(c->loop, EVLOOP_ONESHOT | EVLOOP_NONBLOCK);
  }
}
 
static void response_send_status(connection_t *c, const int status)
{
  buffer_t *buf = &c->write_buffer;
  char *status_line = get_status_line(status);
  
  buffer_append(buf, RESP_HTTP_VERSION, sizeof(RESP_HTTP_VERSION) - 1);
  buffer_append(buf, status_line, strlen(status_line));
  buffer_append(buf, CRLF, sizeof(CRLF) - 1);
}
 
static VALUE iter_header(VALUE value, VALUE *args)
{
  connection_t *c = (connection_t *) args[0];
  VALUE key = (VALUE) args[1];
  
  response_send_chunk(c, RSTRING_PTR(key), RSTRING_LEN(key));
  response_send_chunk(c, HEADER_SEP, sizeof(HEADER_SEP) - 1);
  
  /* if value ends w/ line break w/ chomp it! */
  size_t len = RSTRING_LEN(value);
  if (RSTRING_PTR(value)[RSTRING_LEN(value) - 1] == '\n')
    len--;
  
  response_send_chunk(c, RSTRING_PTR(value), len);
  response_send_chunk(c, CRLF, sizeof(CRLF) - 1);
  
  return Qnil;
}
 
static void response_send_headers(connection_t *c, VALUE headers)
{
  VALUE hash, keys, key, value;
  size_t i;
  
  keys = rb_funcall(headers, sInternedKeys, 0);
  
  for (i = 0; i < RARRAY_LEN(keys); ++i) {
    key = RARRAY_PTR(keys)[i];
    value = rb_hash_aref(headers, key);
    
    VALUE args[2] = { (VALUE) c, key };
    
    rb_iterate(rb_each, value, iter_header, (VALUE) args);
  }
  
  response_send_chunk(c, CRLF, sizeof(CRLF) - 1);
}
 
static VALUE iter_body(VALUE chunk, VALUE *val_conn)
{
  connection_t *c = (connection_t *) val_conn;
  
  response_send_chunk(c, RSTRING_PTR(chunk), RSTRING_LEN(chunk));
  
  return Qnil;
}
 
static void response_send_body(connection_t *c, VALUE body)
{
  if (TYPE(body) == T_STRING && RSTRING_LEN(body) < BUFFER_MAX_LEN) {
    /* Calling String#each creates several other strings which is slower and use more mem,
* also Ruby 1.9 doesn't define that method anymore, so it's better to send one big string. */
    response_send_chunk(c, RSTRING_PTR(body), RSTRING_LEN(body));
    
  } else {
    /* Iterate over body#each and send each yielded chunk */
    rb_iterate(rb_each, body, iter_body, (VALUE) c);
    
  }
}
 
static VALUE response_run(connection_t *c)
{
  /* Call the app to process the request */
  VALUE response = rb_funcall_rescue(c->backend->app, sInternedCall, 1, c->env);
  
  if (response == Qundef) {
    /* log any error */
    rb_funcall(c->backend->obj, rb_intern("log_last_exception"), 0);
    connection_close(c);
    
  } else {
    /* store response info and prepare for writing */
    int status = FIX2INT(rb_ary_entry(response, 0));
    VALUE headers = rb_ary_entry(response, 1);
    VALUE body = rb_ary_entry(response, 2);
    
    /* read buffer no longer needed, free up now so we
* can reuse some of it for write buffer */
    buffer_reset(&c->read_buffer);
    
    connection_watch_writable(c);
    
    response_send_status(c, status);
    response_send_headers(c, headers);
    response_send_body(c, body);
    
    c->finished = 1;
    
    if (buffer_eof(&c->write_buffer))
      connection_close(c);
    
  }
  
  c->backend->thread_count--;
  
  return Qnil;
}
 
void response_process(connection_t *c)
{
  c->backend->thread_count++;
  /* call the Rack app in a Ruby green thread */
  rb_thread_create(response_run, (void*) c);
}