Skip to content
Browse files

fully tunable auto-push feature

  • Loading branch information...
1 parent 4e475cc commit b960e68a0b6857b4b6bb90457bc25612ae111c06 @arut committed Jul 19, 2012
Showing with 533 additions and 31 deletions.
  1. +2 −0 config
  2. +489 −0 ngx_rtmp_auto_push_module.c
  3. +9 −30 ngx_rtmp_relay_module.c
  4. +33 −1 ngx_rtmp_relay_module.h
View
2 config
@@ -13,6 +13,7 @@ CORE_MODULES="$CORE_MODULES
ngx_rtmp_exec_module \
ngx_rtmp_codec_module \
ngx_rtmp_play_module \
+ ngx_rtmp_auto_push_module \
"
@@ -43,6 +44,7 @@ NGX_ADDON_SRCS="$NGX_ADDON_SRCS \
$ngx_addon_dir/ngx_rtmp_exec_module.c \
$ngx_addon_dir/ngx_rtmp_codec_module.c \
$ngx_addon_dir/ngx_rtmp_play_module.c \
+ $ngx_addon_dir/ngx_rtmp_auto_push_module.c \
"
CFLAGS="$CFLAGS -I$ngx_addon_dir"
View
489 ngx_rtmp_auto_push_module.c
@@ -0,0 +1,489 @@
+/*
+ * Copyright (c) 2012 Roman Arutyunyan
+ */
+
+
+#include "ngx_rtmp_cmd_module.h"
+#include "ngx_rtmp_relay_module.h"
+
+
+static ngx_rtmp_publish_pt next_publish;
+static ngx_rtmp_delete_stream_pt next_delete_stream;
+
+
+static ngx_int_t ngx_rtmp_auto_push_init_process(ngx_cycle_t *cycle);
+static void ngx_rtmp_auto_push_exit_process(ngx_cycle_t *cycle);
+static void * ngx_rtmp_auto_push_create_conf(ngx_cycle_t *cf);
+static char * ngx_rtmp_auto_push_init_conf(ngx_cycle_t *cycle, void *conf);
+static ngx_int_t ngx_rtmp_auto_push_publish(ngx_rtmp_session_t *s,
+ ngx_rtmp_publish_t *v);
+static ngx_int_t ngx_rtmp_auto_push_delete_stream(ngx_rtmp_session_t *s,
+ ngx_rtmp_delete_stream_t *v);
+
+
+typedef struct ngx_rtmp_auto_push_ctx_s ngx_rtmp_auto_push_ctx_t;
+
+struct ngx_rtmp_auto_push_ctx_s {
+ ngx_int_t *slots; /* NGX_MAX_PROCESSES */
+ ngx_str_t name;
+ ngx_event_t push_evt;
+};
+
+
+typedef struct {
+ ngx_flag_t auto_push;
+ ngx_str_t socket_dir;
+ ngx_msec_t push_reconnect;
+} ngx_rtmp_auto_push_conf_t;
+
+
+static ngx_command_t ngx_rtmp_auto_push_commands[] = {
+
+ { ngx_string("rtmp_auto_push"),
+ NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_flag_slot,
+ 0,
+ offsetof(ngx_rtmp_auto_push_conf_t, auto_push),
+ NULL },
+
+ { ngx_string("rtmp_auto_push_reconnect"),
+ NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_msec_slot,
+ 0,
+ offsetof(ngx_rtmp_auto_push_conf_t, push_reconnect),
+ NULL },
+
+ { ngx_string("rtmp_socket_dir"),
+ NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_str_slot,
+ 0,
+ offsetof(ngx_rtmp_auto_push_conf_t, socket_dir),
+ NULL },
+
+ ngx_null_command
+};
+
+
+static ngx_core_module_t ngx_rtmp_auto_push_module_ctx = {
+ ngx_string("rtmp_auto_push"),
+ ngx_rtmp_auto_push_create_conf, /* create conf */
+ ngx_rtmp_auto_push_init_conf /* init conf */
+};
+
+
+ngx_module_t ngx_rtmp_auto_push_module = {
+ NGX_MODULE_V1,
+ &ngx_rtmp_auto_push_module_ctx, /* module context */
+ ngx_rtmp_auto_push_commands, /* module directives */
+ NGX_CORE_MODULE, /* module type */
+ NULL, /* init master */
+ NULL, /* init module */
+ ngx_rtmp_auto_push_init_process, /* init process */
+ NULL, /* init thread */
+ NULL, /* exit thread */
+ ngx_rtmp_auto_push_exit_process, /* exit process */
+ NULL, /* exit master */
+ NGX_MODULE_V1_PADDING
+};
+
+
+#define NGX_RTMP_AUTO_PUSH_SOCKNAME "nginx-rtmp"
+#define NGX_RTMP_AUTO_PUSH_PAGEURL "nginx-auto-push"
+
+
+static ngx_int_t
+ngx_rtmp_auto_push_init_process(ngx_cycle_t *cycle)
+{
+#if (NGX_HAVE_UNIX_DOMAIN)
+ ngx_rtmp_auto_push_conf_t *apcf;
+ ngx_listening_t *ls, *lss;
+ struct sockaddr_un *sun;
+ int reuseaddr;
+ ngx_socket_t s;
+ size_t n;
+ ngx_file_info_t fi;
+
+
+ apcf = (ngx_rtmp_auto_push_conf_t *) ngx_get_conf(cycle->conf_ctx,
+ ngx_rtmp_auto_push_module);
+ if (apcf->auto_push == 0) {
+ return NGX_OK;
+ }
+
+
+ next_publish = ngx_rtmp_publish;
+ ngx_rtmp_publish = ngx_rtmp_auto_push_publish;
+
+ next_delete_stream = ngx_rtmp_delete_stream;
+ ngx_rtmp_delete_stream = ngx_rtmp_auto_push_delete_stream;
+
+
+ reuseaddr = 1;
+ s = (ngx_socket_t) -1;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_RTMP, cycle->log, 0,
+ "auto_push: creating sockets");
+
+ /*TODO: clone all RTMP listenings? */
+ ls = cycle->listening.elts;
+ lss = NULL;
+ for (n = 0; n < cycle->listening.nelts; ++n, ++ls) {
+ if (ls->handler == ngx_rtmp_init_connection) {
+ lss = ls;
+ break;
+ }
+ }
+
+ if (lss == NULL) {
+ return NGX_OK;
+ }
+
+ ls = ngx_array_push(&cycle->listening);
+ if (ls == NULL) {
+ return NGX_ERROR;
+ }
+
+ *ls = *lss;
+
+ ls->socklen = sizeof(struct sockaddr_un);
+ sun = ngx_pcalloc(cycle->pool, ls->socklen);
+ ls->sockaddr = (struct sockaddr *) sun;
+ if (ls->sockaddr == NULL) {
+ return NGX_ERROR;
+ }
+ sun->sun_family = AF_UNIX;
+ *ngx_snprintf((u_char *) sun->sun_path, sizeof(sun->sun_path),
+ "%V/" NGX_RTMP_AUTO_PUSH_SOCKNAME ".%i",
+ &apcf->socket_dir, ngx_process_slot)
+ = 0;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_RTMP, cycle->log, 0,
+ "auto_push: create socket '%s'",
+ sun->sun_path);
+
+ if (ngx_file_info(sun->sun_path, &fi) != ENOENT) {
+ ngx_log_debug1(NGX_LOG_DEBUG_RTMP, cycle->log, 0,
+ "auto_push: delete existing socket '%s'",
+ sun->sun_path);
+ ngx_delete_file(sun->sun_path);
+ }
+
+ ngx_str_set(&ls->addr_text, "worker_socket");
+
+ s = ngx_socket(AF_UNIX, SOCK_STREAM, 0);
+ if (s == -1) {
+ ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno,
+ ngx_socket_n " worker_socket failed");
+ return NGX_ERROR;
+ }
+
+ if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
+ (const void *) &reuseaddr, sizeof(int))
+ == -1)
+ {
+ ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno,
+ "setsockopt(SO_REUSEADDR) worker_socket failed");
+ goto sock_error;
+ }
+
+ if (!(ngx_event_flags & NGX_USE_AIO_EVENT)) {
+ if (ngx_nonblocking(s) == -1) {
+ ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno,
+ ngx_nonblocking_n " worker_socket failed");
+ return NGX_ERROR;
+ }
+ }
+
+ if (bind(s, sun, sizeof(*sun)) == -1) {
+ ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno,
+ ngx_nonblocking_n " worker_socket bind failed");
+ goto sock_error;
+ }
+
+ if (listen(s, NGX_LISTEN_BACKLOG) == -1) {
+ ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno,
+ "listen() to worker_socket, backlog %d failed",
+ NGX_LISTEN_BACKLOG);
+ goto sock_error;
+ }
+
+ ls->fd = s;
+ ls->listen = 1;
+
+ return NGX_OK;
+
+sock_error:
+ if (s != (ngx_socket_t) -1 && ngx_close_socket(s) == -1) {
+ ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno,
+ ngx_close_socket_n " worker_socket failed");
+ }
+ ngx_delete_file(sun->sun_path);
+
+ return NGX_ERROR;
+
+#else /* NGX_HAVE_UNIX_DOMAIN */
+
+ return NGX_OK;
+
+#endif /* NGX_HAVE_UNIX_DOMAIN */
+}
+
+
+static void
+ngx_rtmp_auto_push_exit_process(ngx_cycle_t *cycle)
+{
+#if (NGX_HAVE_UNIX_DOMAIN)
+ ngx_rtmp_auto_push_conf_t *apcf;
+ u_char path[NGX_MAX_PATH];
+
+ apcf = (ngx_rtmp_auto_push_conf_t *) ngx_get_conf(cycle->conf_ctx,
+ ngx_rtmp_auto_push_module);
+ if (apcf->auto_push == 0) {
+ return;
+ }
+ *ngx_snprintf(path, sizeof(path),
+ "%V/" NGX_RTMP_AUTO_PUSH_SOCKNAME ".%i",
+ &apcf->socket_dir, ngx_process_slot)
+ = 0;
+
+ ngx_delete_file(path);
+
+#endif
+}
+
+
+static void *
+ngx_rtmp_auto_push_create_conf(ngx_cycle_t *cycle)
+{
+ ngx_rtmp_auto_push_conf_t *apcf;
+
+ apcf = ngx_pcalloc(cycle->pool, sizeof(ngx_rtmp_auto_push_conf_t));
+ if (apcf == NULL) {
+ return NULL;
+ }
+
+ apcf->auto_push = NGX_CONF_UNSET;
+ apcf->push_reconnect = NGX_CONF_UNSET;
+
+ return apcf;
+}
+
+
+static char *
+ngx_rtmp_auto_push_init_conf(ngx_cycle_t *cycle, void *conf)
+{
+ ngx_rtmp_auto_push_conf_t *apcf = conf;
+
+ ngx_conf_init_value(apcf->auto_push, 0);
+ ngx_conf_init_msec_value(apcf->push_reconnect, 100);
+
+ if (apcf->socket_dir.len == 0) {
+ ngx_str_set(&apcf->socket_dir, "/tmp");
+ }
+
+ return NGX_CONF_OK;
+}
+
+
+static void
+ngx_rtmp_auto_push_reconnect(ngx_event_t *ev)
+{
+ ngx_rtmp_session_t *s = ev->data;
+
+ ngx_rtmp_auto_push_conf_t *apcf;
+ ngx_rtmp_auto_push_ctx_t *ctx;
+ ngx_int_t *slot;
+ ngx_int_t n;
+ ngx_rtmp_relay_target_t at;
+ u_char path[sizeof("unix:") + NGX_MAX_PATH];
+ u_char flash_ver[sizeof("APSH.") + 2
+ + NGX_OFF_T_LEN * 2];
+ u_char *p;
+ ngx_str_t *u;
+ ngx_pid_t pid;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
+ "auto_push: reconnect");
+
+ apcf = (ngx_rtmp_auto_push_conf_t *) ngx_get_conf(ngx_cycle->conf_ctx,
+ ngx_rtmp_auto_push_module);
+ ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_auto_push_module);
+ if (ctx == NULL) {
+ return;
+ }
+
+ ngx_memzero(&at, sizeof(at));
+ ngx_str_set(&at.page_url, NGX_RTMP_AUTO_PUSH_PAGEURL);
+ at.tag = &ngx_rtmp_auto_push_module;
+
+ slot = ctx->slots;
+
+ for (n = 0; n < NGX_MAX_PROCESSES; ++n, ++slot) {
+ if (n == ngx_process_slot) {
+ continue;
+ }
+
+ pid = ngx_processes[n].pid;
+ if (pid == 0 || pid == -1) {
+ continue;
+ }
+
+ if (*slot) {
+ continue;
+ }
+
+ at.data = &ngx_processes[n];
+
+ ngx_memzero(&at.url, sizeof(at.url));
+ u = &at.url.url;
+ p = ngx_snprintf(path, sizeof(path) - 1,
+ "unix:%V/" NGX_RTMP_AUTO_PUSH_SOCKNAME ".%i",
+ &apcf->socket_dir, n);
+ *p = 0;
+ u->data = path;
+ u->len = p - path;
+ if (ngx_parse_url(s->connection->pool, &at.url) != NGX_OK) {
+ ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
+ "auto_push: auto-push parse_url failed "
+ "url='%V' name='%V'",
+ u, &ctx->name);
+ continue;
+ }
+
+ p = ngx_snprintf(flash_ver, sizeof(flash_ver) - 1, "APSH,%i,%i",
+ (ngx_int_t) ngx_process_slot, (ngx_int_t) ngx_pid);
+ at.flash_ver.data = flash_ver;
+ at.flash_ver.len = p - flash_ver;
+
+ ngx_log_debug4(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
+ "auto_push: connect slot=%i pid=%i socket='%s' "
+ "name='%V'",
+ n, (ngx_int_t) pid, path, &ctx->name);
+
+ if (ngx_rtmp_relay_push(s, &ctx->name, &at) == NGX_OK) {
+ *slot = 1;
+ continue;
+ }
+
+ ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
+ "auto_push: connect failed: slot=%i pid=%i socket='%s'"
+ "url='%V' name='%V'",
+ n, (ngx_int_t) pid, path, u, &ctx->name);
+
+ if (!ctx->push_evt.timer_set) {
+ ngx_add_timer(&ctx->push_evt, apcf->push_reconnect);
+ }
+ }
+}
+
+
+static ngx_int_t
+ngx_rtmp_auto_push_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
+{
+ ngx_rtmp_auto_push_conf_t *apcf;
+ ngx_rtmp_auto_push_ctx_t *ctx;
+
+ apcf = (ngx_rtmp_auto_push_conf_t *) ngx_get_conf(ngx_cycle->conf_ctx,
+ ngx_rtmp_auto_push_module);
+ if (apcf->auto_push == 0) {
+ goto next;
+ }
+
+ /* auto-push from another worker? */
+ if (s->page_url.len == sizeof(NGX_RTMP_AUTO_PUSH_PAGEURL) - 1 &&
+ ngx_memcmp(s->page_url.data, NGX_RTMP_AUTO_PUSH_PAGEURL,
+ s->page_url.len) == 0)
+ {
+ goto next;
+ }
+
+ ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_auto_push_module);
+ if (ctx == NULL) {
+ ctx = ngx_palloc(s->connection->pool,
+ sizeof(ngx_rtmp_auto_push_ctx_t));
+ if (ctx == NULL) {
+ goto next;
+ }
+ ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_auto_push_module);
+
+ }
+ ngx_memzero(ctx, sizeof(*ctx));
+
+ ctx->push_evt.data = s;
+ ctx->push_evt.log = s->connection->log;
+ ctx->push_evt.handler = ngx_rtmp_auto_push_reconnect;
+
+ ctx->slots = ngx_pcalloc(s->connection->pool,
+ sizeof(ngx_int_t) * NGX_MAX_PROCESSES);
+ if (ctx->slots == NULL) {
+ goto next;
+ }
+
+ ctx->name.len = ngx_strlen(v->name);
+ ctx->name.data = ngx_palloc(s->connection->pool, ctx->name.len);
+ if (ctx->name.data == NULL) {
+ goto next;
+ }
+ ngx_memcpy(ctx->name.data, v->name, ctx->name.len);
+
+ ngx_rtmp_auto_push_reconnect(&ctx->push_evt);
+
+next:
+ return next_publish(s, v);
+}
+
+
+static ngx_int_t
+ngx_rtmp_auto_push_delete_stream(ngx_rtmp_session_t *s,
+ ngx_rtmp_delete_stream_t *v)
+{
+ ngx_rtmp_auto_push_conf_t *apcf;
+ ngx_rtmp_auto_push_ctx_t *ctx, *pctx;
+ ngx_rtmp_relay_ctx_t *rctx;
+ ngx_int_t slot;
+
+ apcf = (ngx_rtmp_auto_push_conf_t *) ngx_get_conf(ngx_cycle->conf_ctx,
+ ngx_rtmp_auto_push_module);
+ if (apcf->auto_push == 0) {
+ goto next;
+ }
+
+ ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_auto_push_module);
+ if (ctx) {
+ if (ctx->push_evt.timer_set) {
+ ngx_del_timer(&ctx->push_evt);
+ }
+ goto next;
+ }
+
+ /* skip non-relays & publishers */
+ rctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
+ if (rctx == NULL ||
+ rctx->tag != &ngx_rtmp_auto_push_module ||
+ rctx->publish == NULL)
+ {
+ goto next;
+ }
+
+ slot = (ngx_process_t *) rctx->data - &ngx_processes[0];
+
+ ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
+ "auto_push: disconnect slot=%i app='%V' name='%V'",
+ slot, &rctx->app, &rctx->name);
+
+ pctx = ngx_rtmp_get_module_ctx(rctx->publish->session,
+ ngx_rtmp_auto_push_module);
+ if (pctx == NULL) {
+ goto next;
+ }
+
+ pctx->slots[slot] = 0;
+
+ /* push reconnect */
+ if (!pctx->push_evt.timer_set) {
+ ngx_add_timer(&pctx->push_evt, apcf->push_reconnect);
+ }
+
+next:
+ return next_delete_stream(s, v);
+}
View
39 ngx_rtmp_relay_module.c
@@ -36,33 +36,6 @@ static ngx_int_t ngx_rtmp_relay_publish(ngx_rtmp_session_t *s,
*/
-typedef struct ngx_rtmp_relay_ctx_s ngx_rtmp_relay_ctx_t;
-
-struct ngx_rtmp_relay_ctx_s {
- ngx_str_t name;
- ngx_str_t url;
- ngx_log_t log;
- ngx_rtmp_session_t *session;
- ngx_rtmp_relay_ctx_t *publish;
- ngx_rtmp_relay_ctx_t *play;
- ngx_rtmp_relay_ctx_t *next;
- unsigned relay:1;
-
- ngx_str_t app;
- ngx_str_t tc_url;
- ngx_str_t page_url;
- ngx_str_t swf_url;
- ngx_str_t flash_ver;
- ngx_str_t play_path;
- ngx_int_t live;
- ngx_int_t start;
- ngx_int_t stop;
-
- ngx_event_t push_evt;
- void *tag;
-};
-
-
typedef struct {
ngx_array_t pulls; /* ngx_rtmp_relay_target_t * */
ngx_array_t pushes; /* ngx_rtmp_relay_target_t * */
@@ -221,7 +194,9 @@ ngx_rtmp_relay_reconnect(ngx_event_t *ev)
}
for (pctx = ctx->play; pctx; pctx = pctx->next) {
- if (pctx->tag == target) {
+ if (pctx->tag == &ngx_rtmp_relay_module &&
+ pctx->data == target)
+ {
break;
}
}
@@ -318,6 +293,7 @@ ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name,
}
rctx->tag = target->tag;
+ rctx->data = target->data;
#define NGX_RTMP_RELAY_STR_COPY(to, from) \
if (ngx_rtmp_relay_copy_str(pool, &rctx->to, &target->from) != NGX_OK) { \
@@ -1239,7 +1215,9 @@ ngx_rtmp_relay_delete_stream(ngx_rtmp_session_t *s, ngx_rtmp_delete_stream_t *v)
&ctx->app, &ctx->name);
/* push reconnect */
- if (ctx->relay && ctx->tag && !ctx->publish->push_evt.timer_set) {
+ if (ctx->relay && ctx->tag == &ngx_rtmp_relay_module &&
+ !ctx->publish->push_evt.timer_set)
+ {
ngx_add_timer(&ctx->publish->push_evt, racf->push_reconnect);
}
@@ -1326,7 +1304,8 @@ ngx_rtmp_relay_push_pull(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
*t = target;
- target->tag = target;
+ target->tag = &ngx_rtmp_relay_module;
+ target->data = target;
u = &target->url;
u->default_port = 1935;
View
34 ngx_rtmp_relay_module.h
@@ -23,10 +23,42 @@ typedef struct {
ngx_int_t start;
ngx_int_t stop;
- void *tag;
+ void *tag; /* usually module reference */
+ void *data; /* module-specific data */
} ngx_rtmp_relay_target_t;
+typedef struct ngx_rtmp_relay_ctx_s ngx_rtmp_relay_ctx_t;
+
+struct ngx_rtmp_relay_ctx_s {
+ ngx_str_t name;
+ ngx_str_t url;
+ ngx_log_t log;
+ ngx_rtmp_session_t *session;
+ ngx_rtmp_relay_ctx_t *publish;
+ ngx_rtmp_relay_ctx_t *play;
+ ngx_rtmp_relay_ctx_t *next;
+ unsigned relay:1;
+
+ ngx_str_t app;
+ ngx_str_t tc_url;
+ ngx_str_t page_url;
+ ngx_str_t swf_url;
+ ngx_str_t flash_ver;
+ ngx_str_t play_path;
+ ngx_int_t live;
+ ngx_int_t start;
+ ngx_int_t stop;
+
+ ngx_event_t push_evt;
+ void *tag;
+ void *data;
+};
+
+
+extern ngx_module_t ngx_rtmp_relay_module;
+
+
ngx_int_t ngx_rtmp_relay_pull(ngx_rtmp_session_t *s, ngx_str_t *name,
ngx_rtmp_relay_target_t *target);
ngx_int_t ngx_rtmp_relay_push(ngx_rtmp_session_t *s, ngx_str_t *name,

0 comments on commit b960e68

Please sign in to comment.
Something went wrong with that request. Please try again.