igrigorik / em-http-request

Asynchronous HTTP Client (EventMachine + Ruby)

This URL has Read+Write access

em-http-request / ext / buffer / em_buffer.c
86c71e9f » igrigorik 2008-08-22 initial import 1 /*
2 * Copyright (C) 2007 Tony Arcieri
3 * You may redistribute this under the terms of the Ruby license.
4 * See LICENSE for details
5 */
6
7 #include "ruby.h"
8 #include "rubyio.h"
9
10 #include <assert.h>
11
12 #include <string.h>
13 #include <time.h>
14 #include <errno.h>
15
67671d33 » ghazel 2009-02-22 windows compilation fixes 16 #ifndef GetReadFile
17 #define FPTR_TO_FD(fptr) (fptr->fd)
18 #else
19 #define FPTR_TO_FD(fptr) (fileno(GetReadFile(fptr)))
20 #endif
21
86c71e9f » igrigorik 2008-08-22 initial import 22 /* Default number of bytes in each node's buffer */
23 #define DEFAULT_NODE_SIZE 16384
24
25 /* Maximum age of a buffer node in a memory pool, in seconds */
26 #define MAX_AGE 60
27
28 /* How often to scan the pool for old nodes */
29 #define PURGE_INTERVAL 10
30
31 struct buffer {
32 time_t last_purged_at;
33 unsigned size, node_size;
34 struct buffer_node *head, *tail;
35 struct buffer_node *pool_head, *pool_tail;
36
37 };
38
39 struct buffer_node {
40 time_t last_used_at;
41 unsigned start, end;
42 struct buffer_node *next;
43 unsigned char data[0];
44 };
45
46 static VALUE mEm = Qnil;
47 static VALUE cEm_Buffer = Qnil;
48
49 static VALUE Em_Buffer_allocate(VALUE klass);
50 static void Em_Buffer_mark(struct buffer *);
51 static void Em_Buffer_free(struct buffer *);
52
53 static VALUE Em_Buffer_initialize(int argc, VALUE *argv, VALUE self);
54 static VALUE Em_Buffer_clear(VALUE self);
55 static VALUE Em_Buffer_size(VALUE self);
56 static VALUE Em_Buffer_empty(VALUE self);
57 static VALUE Em_Buffer_append(VALUE self, VALUE data);
58 static VALUE Em_Buffer_prepend(VALUE self, VALUE data);
59 static VALUE Em_Buffer_read(int argc, VALUE *argv, VALUE self);
60 static VALUE Em_Buffer_to_str(VALUE self);
61 static VALUE Em_Buffer_read_from(VALUE self, VALUE io);
62 static VALUE Em_Buffer_write_to(VALUE self, VALUE io);
63
64 static struct buffer *buffer_new(void);
65 static void buffer_clear(struct buffer *buf);
66 static void buffer_free(struct buffer *buf);
67 static void buffer_gc(struct buffer *buf);
68 static void buffer_prepend(struct buffer *buf, char *str, unsigned len);
69 static void buffer_append(struct buffer *buf, char *str, unsigned len);
70 static void buffer_read(struct buffer *buf, char *str, unsigned len);
71 static void buffer_copy(struct buffer *buf, char *str, unsigned len);
72 static int buffer_read_from(struct buffer *buf, int fd);
73 static int buffer_write_to(struct buffer *buf, int fd);
74
75 /*
76 * High speed buffering geared towards non-blocking I/O.
77 *
78 * Data is stored in a byte queue implemented as a linked list of equal size
79 * chunks. Since every node in the list is the same size they are easily
80 * memory pooled. Routines are provided for high speed non-blocking reads
81 * and writes from Ruby IO objects.
82 */
83 void Init_em_buffer()
84 {
85 mEm = rb_define_module("EventMachine");
86 cEm_Buffer = rb_define_class_under(mEm, "Buffer", rb_cObject);
87 rb_define_alloc_func(cEm_Buffer, Em_Buffer_allocate);
88
89 rb_define_method(cEm_Buffer, "initialize", Em_Buffer_initialize, -1);
90 rb_define_method(cEm_Buffer, "clear", Em_Buffer_clear, 0);
91 rb_define_method(cEm_Buffer, "size", Em_Buffer_size, 0);
92 rb_define_method(cEm_Buffer, "empty?", Em_Buffer_empty, 0);
93 rb_define_method(cEm_Buffer, "<<", Em_Buffer_append, 1);
94 rb_define_method(cEm_Buffer, "append", Em_Buffer_append, 1);
95 rb_define_method(cEm_Buffer, "prepend", Em_Buffer_prepend, 1);
96 rb_define_method(cEm_Buffer, "read", Em_Buffer_read, -1);
97 rb_define_method(cEm_Buffer, "to_str", Em_Buffer_to_str, 0);
98 rb_define_method(cEm_Buffer, "read_from", Em_Buffer_read_from, 1);
99 rb_define_method(cEm_Buffer, "write_to", Em_Buffer_write_to, 1);
100 }
101
102 static VALUE Em_Buffer_allocate(VALUE klass)
103 {
104 return Data_Wrap_Struct(klass, Em_Buffer_mark, Em_Buffer_free, buffer_new());
105 }
106
107 static void Em_Buffer_mark(struct buffer *buf)
108 {
109 /* Walks the pool of unused chunks and frees any that are beyond a certain age */
110 buffer_gc(buf);
111 }
112
113 static void Em_Buffer_free(struct buffer *buf)
114 {
115 buffer_free(buf);
116 }
117
118 /**
119 * call-seq:
120 * EventMachine::Buffer.new(size = DEFAULT_NODE_SIZE) -> EventMachine::Buffer
121 *
122 * Create a new EventMachine::Buffer with linked segments of the given size
123 */
124 static VALUE Em_Buffer_initialize(int argc, VALUE *argv, VALUE self)
125 {
126 VALUE node_size_obj;
127 int node_size;
128 struct buffer *buf;
129
130 if(rb_scan_args(argc, argv, "01", &node_size_obj) == 1) {
131 node_size = NUM2INT(node_size_obj);
132
133 if(node_size < 1) rb_raise(rb_eArgError, "invalid buffer size");
134
135 Data_Get_Struct(self, struct buffer, buf);
136
137 /* Make sure we're not changing the buffer size after data has been allocated */
138 assert(!buf->head);
139 assert(!buf->pool_head);
140
141 buf->node_size = node_size;
142 }
143
144 return Qnil;
145 }
146
147 /**
148 * call-seq:
149 * EventMachine::Buffer#clear -> nil
150 *
151 * Clear all data from the EventMachine::Buffer
152 */
153 static VALUE Em_Buffer_clear(VALUE self)
154 {
155 struct buffer *buf;
156 Data_Get_Struct(self, struct buffer, buf);
157
158 buffer_clear(buf);
159
160 return Qnil;
161 }
162
163 /**
164 * call-seq:
165 * EventMachine::Buffer#size -> Integer
166 *
167 * Return the size of the buffer in bytes
168 */
169 static VALUE Em_Buffer_size(VALUE self)
170 {
171 struct buffer *buf;
172 Data_Get_Struct(self, struct buffer, buf);
173
174 return INT2NUM(buf->size);
175 }
176
177 /**
178 * call-seq:
179 * EventMachine::Buffer#empty? -> Boolean
180 *
181 * Is the buffer empty?
182 */
183 static VALUE Em_Buffer_empty(VALUE self)
184 {
185 struct buffer *buf;
186 Data_Get_Struct(self, struct buffer, buf);
187
188 return buf->size > 0 ? Qfalse : Qtrue;
189 }
190
191 /**
192 * call-seq:
193 * EventMachine::Buffer#append(data) -> String
194 *
195 * Append the given data to the end of the buffer
196 */
197 static VALUE Em_Buffer_append(VALUE self, VALUE data)
198 {
199 struct buffer *buf;
200 Data_Get_Struct(self, struct buffer, buf);
201
202 /* Is this needed? Never seen anyone else do it... */
203 data = rb_convert_type(data, T_STRING, "String", "to_str");
204 buffer_append(buf, RSTRING_PTR(data), RSTRING_LEN(data));
205
206 return data;
207 }
208
209 /**
210 * call-seq:
211 * EventMachine::Buffer#prepend(data) -> String
212 *
213 * Prepend the given data to the beginning of the buffer
214 */
215 static VALUE Em_Buffer_prepend(VALUE self, VALUE data)
216 {
217 struct buffer *buf;
218 Data_Get_Struct(self, struct buffer, buf);
219
220 data = rb_convert_type(data, T_STRING, "String", "to_str");
221 buffer_prepend(buf, RSTRING_PTR(data), RSTRING_LEN(data));
222
223 return data;
224 }
225
226 /**
227 * call-seq:
228 * EventMachine::Buffer#read(length = nil) -> String
229 *
230 * Read the specified abount of data from the buffer. If no value
231 * is given the entire contents of the buffer are returned. Any data
232 * read from the buffer is cleared.
233 */
234 static VALUE Em_Buffer_read(int argc, VALUE *argv, VALUE self)
235 {
236 VALUE length_obj, str;
237 int length;
238 struct buffer *buf;
239
240 Data_Get_Struct(self, struct buffer, buf);
241
242 if(rb_scan_args(argc, argv, "01", &length_obj) == 1) {
243 length = NUM2INT(length_obj);
244 } else {
245 if(buf->size == 0)
246 return rb_str_new2("");
247
248 length = buf->size;
249 }
250
251 if(length > buf->size)
252 length = buf->size;
253
254 if(length < 1)
255 rb_raise(rb_eArgError, "length must be greater than zero");
256
257 str = rb_str_new(0, length);
258 buffer_read(buf, RSTRING_PTR(str), length);
259
260 return str;
261 }
262
263 /**
264 * call-seq:
265 * EventMachine::Buffer#to_str -> String
266 *
267 * Convert the Buffer to a String. The original buffer is unmodified.
268 */
269 static VALUE Em_Buffer_to_str(VALUE self) {
270 VALUE str;
271 struct buffer *buf;
272
273 Data_Get_Struct(self, struct buffer, buf);
274
275 str = rb_str_new(0, buf->size);
276 buffer_copy(buf, RSTRING_PTR(str), buf->size);
277
278 return str;
279 }
280
281 /**
282 * call-seq:
283 * EventMachine::Buffer#read_from(io) -> Integer
284 *
285 * Perform a nonblocking read of the the given IO object and fill
286 * the buffer with any data received. The call will read as much
287 * data as it can until the read would block.
288 */
289 static VALUE Em_Buffer_read_from(VALUE self, VALUE io) {
290 struct buffer *buf;
291 #if HAVE_RB_IO_T
292 rb_io_t *fptr;
293 #else
294 OpenFile *fptr;
295 #endif
296
297 Data_Get_Struct(self, struct buffer, buf);
298 GetOpenFile(rb_convert_type(io, T_FILE, "IO", "to_io"), fptr);
299 rb_io_set_nonblock(fptr);
300
301 return INT2NUM(buffer_read_from(buf, FPTR_TO_FD(fptr)));
302 }
303
304 /**
305 * call-seq:
306 * EventMachine::Buffer#write_to(io) -> Integer
307 *
308 * Perform a nonblocking write of the buffer to the given IO object.
309 * As much data as possible is written until the call would block.
310 * Any data which is written is removed from the buffer.
311 */
312 static VALUE Em_Buffer_write_to(VALUE self, VALUE io) {
313 struct buffer *buf;
314 #if HAVE_RB_IO_T
315 rb_io_t *fptr;
316 #else
317 OpenFile *fptr;
318 #endif
319
320 Data_Get_Struct(self, struct buffer, buf);
321 GetOpenFile(rb_convert_type(io, T_FILE, "IO", "to_io"), fptr);
322 rb_io_set_nonblock(fptr);
323
324 return INT2NUM(buffer_write_to(buf, FPTR_TO_FD(fptr)));
325 }
326
327 /*
328 * Ruby bindings end here. Below is the actual implementation of
329 * the underlying data structures.
330 */
331
332 /* Create a new buffer */
333 static struct buffer *buffer_new(void)
334 {
335 struct buffer *buf;
336
337 buf = (struct buffer *)xmalloc(sizeof(struct buffer));
338 buf->head = buf->tail = buf->pool_head = buf->pool_tail = 0;
339 buf->size = 0;
340 buf->node_size = DEFAULT_NODE_SIZE;
341 time(&buf->last_purged_at);
342
343 return buf;
344 }
345
346 /* Clear all data from a buffer */
347 static void buffer_clear(struct buffer *buf)
348 {
349 struct buffer_node *tmp;
350
351 /* Move everything into the buffer pool */
352 if(!buf->pool_tail)
353 buf->pool_head = buf->pool_tail = buf->head;
354 else
355 buf->pool_tail->next = buf->head;
356
357 buf->head = buf->tail = 0;
358 buf->size = 0;
359 }
360
361 /* Free a buffer */
362 static void buffer_free(struct buffer *buf)
363 {
364 struct buffer_node *tmp;
365
366 buffer_clear(buf);
367
368 while(buf->pool_head) {
369 tmp = buf->pool_head;
370 buf->pool_head = tmp->next;
371 free(tmp);
372 }
373
374 free(buf);
375 }
376
377 /* Run through the pool and find elements that haven't been used for awhile */
378 static void buffer_gc(struct buffer *buf)
379 {
380 struct buffer_node *cur, *tmp;
381 time_t now;
382 time(&now);
383
384 /* Only purge if we've passed the purge interval */
385 if(now - buf->last_purged_at < PURGE_INTERVAL)
386 return;
387
388 buf->last_purged_at = now;
389
390 while(buf->pool_head && now - buf->pool_head->last_used_at >= MAX_AGE) {
391 tmp = buf->pool_head;
392 buf->pool_head = buf->pool_head->next;
393 free(tmp);
394 }
395
396 if(!buf->pool_head)
397 buf->pool_tail = 0;
398 }
399
400 /* Create a new buffer_node (or pull one from the memory pool) */
401 static struct buffer_node *buffer_node_new(struct buffer *buf)
402 {
403 struct buffer_node *node;
404
405 /* Pull from the memory pool if available */
406 if(buf->pool_head) {
407 node = buf->pool_head;
408 buf->pool_head = node->next;
409
410 if(node->next)
411 node->next = 0;
412 else
413 buf->pool_tail = 0;
414 } else {
415 node = (struct buffer_node *)xmalloc(sizeof(struct buffer_node) + buf->node_size);
416 node->next = 0;
417 }
418
419 node->start = node->end = 0;
420 return node;
421 }
422
423 /* Free a buffer node (i.e. return it to the memory pool) */
424 static void buffer_node_free(struct buffer *buf, struct buffer_node *node)
425 {
426 /* Store when the node was freed */
427 time(&node->last_used_at);
428
429 node->next = buf->pool_head;
430 buf->pool_head = node;
431
432 if(!buf->pool_tail)
433 buf->pool_tail = node;
434 }
435
436 /* Prepend data to the front of the buffer */
437 static void buffer_prepend(struct buffer *buf, char *str, unsigned len)
438 {
439 struct buffer_node *node, *tmp;
440 buf->size += len;
441
442 /* If it fits in the beginning of the head */
443 if(buf->head && buf->head->start >= len) {
444 buf->head->start -= len;
445 memcpy(buf->head->data + buf->head->start, str, len);
446 } else {
447 node = buffer_node_new(buf);
448 node->next = buf->head;
449 buf->head = node;
450 if(!buf->tail) buf->tail = node;
451
452 while(len > buf->node_size) {
453 memcpy(node->data, str, buf->node_size);
454 node->end = buf->node_size;
455
456 tmp = buffer_node_new(buf);
457 tmp->next = node->next;
458 node->next = tmp;
459
460 if(buf->tail == node) buf->tail = tmp;
461 node = tmp;
462
463 str += buf->node_size;
464 len -= buf->node_size;
465 }
466
467 if(len > 0) {
468 memcpy(node->data, str, len);
469 node->end = len;
470 }
471 }
472 }
473
474 /* Append data to the front of the buffer */
475 static void buffer_append(struct buffer *buf, char *str, unsigned len)
476 {
477 unsigned nbytes;
478 buf->size += len;
479
480 /* If it fits in the remaining space in the tail */
481 if(buf->tail && len <= buf->node_size - buf->tail->end) {
482 memcpy(buf->tail->data + buf->tail->end, str, len);
483 buf->tail->end += len;
484 return;
485 }
486
487 /* Empty list needs initialized */
488 if(!buf->head) {
489 buf->head = buffer_node_new(buf);
490 buf->tail = buf->head;
491 }
492
493 /* Build links out of the data */
494 while(len > 0) {
495 nbytes = buf->node_size - buf->tail->end;
496 if(len < nbytes) nbytes = len;
497
498 memcpy(buf->tail->data + buf->tail->end, str, nbytes);
499 str += nbytes;
500 len -= nbytes;
501
502 buf->tail->end += nbytes;
503
504 if(len > 0) {
505 buf->tail->next = buffer_node_new(buf);
506 buf->tail = buf->tail->next;
507 }
508 }
509 }
510
511 /* Read data from the buffer (and clear what we've read) */
512 static void buffer_read(struct buffer *buf, char *str, unsigned len)
513 {
514 unsigned nbytes;
515 struct buffer_node *tmp;
516
517 while(buf->size > 0 && len > 0) {
518 nbytes = buf->head->end - buf->head->start;
519 if(len < nbytes) nbytes = len;
520
521 memcpy(str, buf->head->data + buf->head->start, nbytes);
522 str += nbytes;
523 len -= nbytes;
524
525 buf->head->start += nbytes;
526 buf->size -= nbytes;
527
528 if(buf->head->start == buf->head->end) {
529 tmp = buf->head;
530 buf->head = tmp->next;
531 buffer_node_free(buf, tmp);
532
533 if(!buf->head) buf->tail = 0;
534 }
535 }
536 }
537
538 /* Copy data from the buffer without clearing it */
539 static void buffer_copy(struct buffer *buf, char *str, unsigned len)
540 {
541 unsigned nbytes;
542 struct buffer_node *node;
543
544 node = buf->head;
545 while(node && len > 0) {
546 nbytes = node->end - node->start;
547 if(len < nbytes) nbytes = len;
548
549 memcpy(str, node->data + node->start, nbytes);
550 str += nbytes;
551 len -= nbytes;
552
553 if(node->start + nbytes == node->end)
554 node = node->next;
555 }
556 }
557
558 /* Write data from the buffer to a file descriptor */
559 static int buffer_write_to(struct buffer *buf, int fd)
560 {
561 int bytes_written, total_bytes_written = 0;
562 struct buffer_node *tmp;
563
564 while(buf->head) {
565 bytes_written = write(fd, buf->head->data + buf->head->start, buf->head->end - buf->head->start);
566
567 /* If the write failed... */
568 if(bytes_written < 0) {
569 if(errno != EAGAIN)
570 rb_sys_fail("write");
571
572 return total_bytes_written;
573 }
574
575 total_bytes_written += bytes_written;
576 buf->size -= bytes_written;
577
578 /* If the write blocked... */
579 if(bytes_written < buf->head->end - buf->head->start) {
580 buf->head->start += bytes_written;
581 return total_bytes_written;
582 }
583
584 /* Otherwise we wrote the whole buffer */
585 tmp = buf->head;
586 buf->head = tmp->next;
587 buffer_node_free(buf, tmp);
588
589 if(!buf->head) buf->tail = 0;
590 }
591
592 return total_bytes_written;
593 }
594
595 /* Read data from a file descriptor to a buffer */
596 /* Append data to the front of the buffer */
597 static int buffer_read_from(struct buffer *buf, int fd)
598 {
599 int bytes_read, total_bytes_read = 0;
600 unsigned nbytes;
601
602 /* Empty list needs initialized */
603 if(!buf->head) {
604 buf->head = buffer_node_new(buf);
605 buf->tail = buf->head;
606 }
607
608 do {
609 nbytes = buf->node_size - buf->tail->end;
610 bytes_read = read(fd, buf->tail->data + buf->tail->end, nbytes);
611
612 if(bytes_read < 1) {
613 if(errno != EAGAIN)
614 rb_sys_fail("read");
615
616 return total_bytes_read;
617 }
618
619 total_bytes_read += bytes_read;
620 buf->tail->end += nbytes;
621 buf->size += nbytes;
622
623 if(buf->tail->end == buf->node_size) {
624 buf->tail->next = buffer_node_new(buf);
625 buf->tail = buf->tail->next;
626 }
627 } while(bytes_read == nbytes);
628
629 return total_bytes_read;
630 }