Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

implemenmted several optimizations

  • Loading branch information...
commit 6ae16bf9d8c678bd8a12cfc8008a397f700b1b8e 1 parent 40684b1
@arut authored
Showing with 73 additions and 90 deletions.
  1. +26 −3 ngx_rtmp.h
  2. +28 −42 ngx_rtmp_handler.c
  3. +19 −45 ngx_rtmp_shared.c
View
29 ngx_rtmp.h
@@ -171,6 +171,9 @@ typedef struct {
} ngx_rtmp_stream_t;
+#define NGX_RTMP_OUT_QUEUE 256
+
+
typedef struct {
uint32_t signature; /* "RTMP" */ /* <-- FIXME wtf */
@@ -217,8 +220,8 @@ typedef struct {
ngx_int_t in_chunk_size_changing;
/* circular buffer of RTMP message pointers */
- ngx_chain_t **out_start, **out_end;
- ngx_chain_t **out_pos, **out_last;
+ size_t out_pos, out_last;
+ ngx_chain_t *out[NGX_RTMP_OUT_QUEUE];
ngx_chain_t *out_chain;
u_char *out_bpos;
} ngx_rtmp_session_t;
@@ -362,13 +365,33 @@ ngx_int_t ngx_rtmp_amf_shared_object_handler(ngx_rtmp_session_t *s,
/* Shared output buffers */
+
+/* Store refcount in negative bytes of shared buffer */
+
+#define NGX_RTMP_REFCOUNT_TYPE uint32_t
+#define NGX_RTMP_REFCOUNT_BYTES sizeof(NGX_RTMP_REFCOUNT_TYPE)
+
+#define ngx_rtmp_ref(b) \
+ *((NGX_RTMP_REFCOUNT_TYPE*)(b) - 1)
+
+#define ngx_rtmp_ref_set(b, v) \
+ ngx_rtmp_ref(b) = v
+
+#define ngx_rtmp_ref_get(b) \
+ ++ngx_rtmp_ref(b)
+
+#define ngx_rtmp_ref_put(b) \
+ --ngx_rtmp_ref(b)
+
ngx_chain_t * ngx_rtmp_alloc_shared_buf(ngx_rtmp_core_srv_conf_t *cscf);
-void ngx_rtmp_acquire_shared_chain(ngx_chain_t *in);
void ngx_rtmp_free_shared_chain(ngx_rtmp_core_srv_conf_t *cscf,
ngx_chain_t *in);
ngx_chain_t * ngx_rtmp_append_shared_bufs(ngx_rtmp_core_srv_conf_t *cscf,
ngx_chain_t *head, ngx_chain_t *in);
+#define ngx_rtmp_acquire_shared_chain(in) \
+ ngx_rtmp_ref_get(in); \
+
/* Sending messages */
void ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
View
70 ngx_rtmp_handler.c
@@ -243,13 +243,13 @@ ngx_rtmp_init_session(ngx_connection_t *c)
size = NGX_RTMP_HANDSHAKE_SIZE + 1;
- s->out_start = ngx_palloc(c->pool, sizeof(ngx_chain_t *) * cscf->max_queue);
+ /*s->out_start = s->out;ngx_palloc(c->pool, sizeof(ngx_chain_t *) * cscf->max_queue);
if (s->out_start == NULL) {
ngx_rtmp_close_connection(c);
return;
- }
- s->out_pos = s->out_last = s->out_start;
- s->out_end = s->out_start + cscf->max_queue;
+ }*/
+ /*s->out_pos = s->last = s->out;s->out_last = s->out_start;*/
+ /*s->out_end = s->out_start + sizeof(s->out) / sizeof(s->out[0]);cscf->max_queue*/;
ngx_rtmp_set_chunk_size(s, NGX_RTMP_DEFAULT_CHUNK_SIZE);
@@ -824,6 +824,11 @@ ngx_rtmp_send(ngx_event_t *wev)
ngx_del_timer(wev);
}
+ if (s->out_chain == NULL && s->out_pos != s->out_last) {
+ s->out_chain = s->out[s->out_pos];
+ s->out_bpos = s->out_chain->buf->pos;
+ }
+
while (s->out_chain) {
n = c->send(c, s->out_bpos, s->out_chain->buf->last - s->out_bpos);
@@ -844,15 +849,13 @@ ngx_rtmp_send(ngx_event_t *wev)
if (s->out_bpos == s->out_chain->buf->last) {
s->out_chain = s->out_chain->next;
if (s->out_chain == NULL) {
- ngx_rtmp_free_shared_chain(cscf, *s->out_pos);
+ ngx_rtmp_free_shared_chain(cscf, s->out[s->out_pos]);
++s->out_pos;
- if (s->out_pos == s->out_end) {
- s->out_pos = s->out_start;
- }
+ s->out_pos %= NGX_RTMP_OUT_QUEUE;
if (s->out_pos == s->out_last) {
break;
}
- s->out_chain = *s->out_pos;
+ s->out_chain = s->out[s->out_pos];
}
s->out_bpos = s->out_chain->buf->pos;
}
@@ -1012,48 +1015,34 @@ ngx_int_t
ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out,
ngx_uint_t priority)
{
- ngx_connection_t *c;
- ngx_rtmp_core_srv_conf_t *cscf;
- size_t nmsg;
-
- c = s->connection;
- cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
+ ngx_int_t nmsg;
- nmsg = (s->out_pos <= s->out_last)
- ? s->out_last - s->out_pos
- : (s->out_end - s->out_pos) + (s->out_last - s->out_start);
- ++nmsg;
+ nmsg = (s->out_last - s->out_pos) % NGX_RTMP_OUT_QUEUE + 1;
/* drop packet?
* Note we always leave 1 slot free */
- if (nmsg >= (s->out_end - s->out_start) / (priority + 1)) {
- ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0,
+ if (nmsg * (priority + 1) >= NGX_RTMP_OUT_QUEUE) {
+ ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"RTMP drop message bufs=%ui, priority=%ui",
nmsg, priority);
return NGX_AGAIN;
}
- ngx_rtmp_acquire_shared_chain(out);
-
- *s->out_last++ = out;
- if (s->out_last >= s->out_end) {
- s->out_last = s->out_start;
- }
+ s->out[s->out_last++] = out;
+ s->out_last %= NGX_RTMP_OUT_QUEUE;
- if (s->out_chain == NULL) {
- s->out_chain = out;
- s->out_bpos = out->buf->pos;
- }
+ ngx_rtmp_acquire_shared_chain(out);
- ngx_log_debug4(NGX_LOG_DEBUG_RTMP, c->log, 0,
- "RTMP send nmsg=%ui, priority=%ui, ready=%d, active=%d",
- nmsg, priority, c->write->ready, c->write->active);
+ ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
+ "RTMP send nmsg=%ui, priority=%ui",
+ nmsg, priority);
- if (!c->write->active) {
- ngx_rtmp_send(c->write);
+ if (!s->connection->write->active) {
+ ngx_rtmp_send(s->connection->write);
+ /*return ngx_add_event(s->connection->write, NGX_WRITE_EVENT, NGX_CLEAR_EVENT);*/
}
- return c->destroyed ? NGX_ERROR : NGX_OK;
+ return NGX_OK;
}
@@ -1255,11 +1244,8 @@ ngx_rtmp_close_session_handler(ngx_event_t *e)
}
while (s->out_pos != s->out_last) {
- ngx_rtmp_free_shared_chain(cscf, *s->out_pos);
- ++s->out_pos;
- if (s->out_pos == s->out_end) {
- s->out_pos = s->out_start;
- }
+ ngx_rtmp_free_shared_chain(cscf, s->out[s->out_pos++]);
+ s->out_pos %= NGX_RTMP_OUT_QUEUE;
}
ngx_rtmp_close_connection(c);
View
64 ngx_rtmp_shared.c
@@ -6,30 +6,13 @@
#include "ngx_rtmp.h"
-/* Store refcount in negative bytes of shared buffer */
-
-#define NGX_RTMP_REFCOUNT_TYPE uint32_t
-#define NGX_RTMP_REFCOUNT_BYTES sizeof(NGX_RTMP_REFCOUNT_TYPE)
-
-#define ngx_rtmp_ref(b) \
- *((NGX_RTMP_REFCOUNT_TYPE*)(b) - 1)
-
-#define ngx_rtmp_ref_set(b, v) \
- ngx_rtmp_ref(b) = v
-
-#define ngx_rtmp_ref_get(b) \
- ++ngx_rtmp_ref(b)
-
-#define ngx_rtmp_ref_put(b) \
- --ngx_rtmp_ref(b)
-
-
ngx_chain_t *
ngx_rtmp_alloc_shared_buf(ngx_rtmp_core_srv_conf_t *cscf)
{
- ngx_chain_t *out;
- ngx_buf_t *b;
- size_t size;
+ u_char *p;
+ ngx_chain_t *out;
+ ngx_buf_t *b;
+ size_t size;
if (cscf->free) {
out = cscf->free;
@@ -37,27 +20,25 @@ ngx_rtmp_alloc_shared_buf(ngx_rtmp_core_srv_conf_t *cscf)
} else {
- out = ngx_alloc_chain_link(cscf->pool);
- if (out == NULL) {
- return NULL;
- }
+ size = cscf->chunk_size + NGX_RTMP_MAX_CHUNK_HEADER;
- out->buf = ngx_calloc_buf(cscf->pool);
- if (out->buf == NULL) {
+ p = ngx_pcalloc(cscf->pool, NGX_RTMP_REFCOUNT_BYTES
+ + sizeof(ngx_chain_t)
+ + sizeof(ngx_buf_t)
+ + size);
+ if (p == NULL) {
return NULL;
}
- size = cscf->chunk_size + NGX_RTMP_MAX_CHUNK_HEADER
- + NGX_RTMP_REFCOUNT_BYTES;
+ p += NGX_RTMP_REFCOUNT_BYTES;
+ out = (ngx_chain_t *)p;
- b = out->buf;
- b->start = ngx_palloc(cscf->pool, size);
- if (b->start == NULL) {
- return NULL;
- }
+ p += sizeof(ngx_chain_t);
+ out->buf = (ngx_buf_t *)p;
- b->start += NGX_RTMP_REFCOUNT_BYTES;
- b->end = b->start + size - NGX_RTMP_REFCOUNT_BYTES;
+ p += sizeof(ngx_buf_t);
+ out->buf->start = p;
+ out->buf->end = p + size;
}
out->next = NULL;
@@ -66,25 +47,18 @@ ngx_rtmp_alloc_shared_buf(ngx_rtmp_core_srv_conf_t *cscf)
b->memory = 1;
/* buffer has refcount =1 when created! */
- ngx_rtmp_ref_set(b->start, 1);
+ ngx_rtmp_ref_set(out, 1);
return out;
}
-void
-ngx_rtmp_acquire_shared_chain(ngx_chain_t *in)
-{
- ngx_rtmp_ref_get(in->buf->start);
-}
-
-
void
ngx_rtmp_free_shared_chain(ngx_rtmp_core_srv_conf_t *cscf, ngx_chain_t *in)
{
ngx_chain_t *cl;
- if (ngx_rtmp_ref_put(in->buf->start)) {
+ if (ngx_rtmp_ref_put(in)) {
return;
}
Please sign in to comment.
Something went wrong with that request. Please try again.