Permalink
Browse files

implemented connection dropped in control module

  • Loading branch information...
1 parent c86e30f commit f65f07deb32565b144e22faece57638f8961d62f @arut committed Oct 23, 2012
Showing with 236 additions and 76 deletions.
  1. +236 −76 ngx_rtmp_control_module.c
View
312 ngx_rtmp_control_module.c
@@ -17,8 +17,22 @@ static char * ngx_rtmp_control_merge_loc_conf(ngx_conf_t *cf,
void *parent, void *child);
+typedef struct {
+ ngx_rtmp_core_main_conf_t *cmcf;
+ ngx_rtmp_core_srv_conf_t *cscf;
+ ngx_rtmp_core_app_conf_t *cacf;
+} ngx_rtmp_control_core_t;
+
+
+typedef struct {
+ ngx_rtmp_live_app_conf_t *lacf;
+ ngx_rtmp_live_stream_t *ls;
+} ngx_rtmp_control_live_t;
+
+
#define NGX_RTMP_CONTROL_ALL 0xff
#define NGX_RTMP_CONTROL_RECORD 0x01
+#define NGX_RTMP_CONTROL_DROP 0x02
typedef struct {
@@ -27,9 +41,10 @@ typedef struct {
static ngx_conf_bitmask_t ngx_rtmp_control_masks[] = {
- { ngx_string("all"), NGX_RTMP_CONTROL_ALL },
- { ngx_string("record"), NGX_RTMP_CONTROL_RECORD },
- { ngx_null_string, 0 }
+ { ngx_string("all"), NGX_RTMP_CONTROL_ALL },
+ { ngx_string("record"), NGX_RTMP_CONTROL_RECORD },
+ { ngx_string("drop"), NGX_RTMP_CONTROL_DROP },
+ { ngx_null_string, 0 }
};
@@ -77,127 +92,191 @@ ngx_module_t ngx_rtmp_control_module = {
};
-/* /record arguments:
- * srv - server index (optional)
- * app - application name
- * name - stream name
- * rec - recorder name
- */
-
-
static ngx_int_t
-ngx_rtmp_control_record(ngx_http_request_t *r, ngx_str_t *method)
+ngx_rtmp_control_output_error(ngx_http_request_t *r, const char *msg)
{
- ngx_rtmp_record_app_conf_t *racf;
- ngx_rtmp_core_main_conf_t *cmcf;
- ngx_rtmp_core_srv_conf_t **pcscf, *cscf;
- ngx_rtmp_core_app_conf_t **pcacf, *cacf;
- ngx_rtmp_live_app_conf_t *lacf;
- ngx_rtmp_live_stream_t *ls;
- ngx_rtmp_live_ctx_t *lctx;
- ngx_rtmp_session_t *s;
- ngx_chain_t cl;
- ngx_uint_t sn, rn, n;
- ngx_str_t srv, app, rec, name, path;
- ngx_str_t msg;
- ngx_buf_t *b;
- ngx_int_t rc;
- size_t len;
+ size_t len;
+ ngx_buf_t *b;
+ ngx_chain_t cl;
- sn = 0;
- if (ngx_http_arg(r, (u_char *) "srv", sizeof("srv") - 1, &srv) == NGX_OK) {
- sn = ngx_atoi(srv.data, srv.len);
- }
+ len = ngx_strlen(msg);
- if (ngx_http_arg(r, (u_char *) "app", sizeof("app") - 1, &app) != NGX_OK) {
- ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
- "rtmp_control: app not specified");
- ngx_str_set(&msg, "Application not specified");
- goto error;
+ r->headers_out.status = NGX_HTTP_BAD_REQUEST;
+ r->headers_out.content_length_n = len;
+
+ b = ngx_calloc_buf(r->pool);
+ if (b == NULL) {
+ return NGX_ERROR;
}
- ngx_memzero(&rec, sizeof(rec));
- ngx_http_arg(r, (u_char *) "rec", sizeof("rec") - 1, &rec);
+ ngx_memzero(&cl, sizeof(cl));
+ cl.buf = b;
- ngx_memzero(&name, sizeof(name));
- ngx_http_arg(r, (u_char *) "name", sizeof("name") - 1, &name);
+ b->start = b->pos = (u_char *) msg;
+ b->end = b->last = (u_char *) msg + len;
+ b->memory = 1;
+ b->last_buf = 1;
- cmcf = ngx_rtmp_core_main_conf;
- if (cmcf == NULL) {
- ngx_str_set(&msg, "Missing main RTMP conf");
- goto error;
+ ngx_http_send_header(r);
+
+ return ngx_http_output_filter(r, &cl);
+}
+
+
+static const char *
+ngx_rtmp_control_parse_core(ngx_http_request_t *r,
+ ngx_rtmp_control_core_t *core)
+{
+ ngx_str_t srv, app;
+ ngx_uint_t sn, n;
+ ngx_rtmp_core_srv_conf_t **pcscf;
+ ngx_rtmp_core_app_conf_t **pcacf;
+
+
+ core->cmcf = ngx_rtmp_core_main_conf;
+ if (core->cmcf == NULL) {
+ return "Missing main RTMP conf";
}
/* find server */
- if (sn >= cmcf->servers.nelts) {
- ngx_str_set(&msg, "Server index out of range");
- goto error;
+ sn = 0;
+
+ if (ngx_http_arg(r, (u_char *) "srv", sizeof("srv") - 1, &srv) == NGX_OK) {
+ sn = ngx_atoi(srv.data, srv.len);
+ }
+
+ if (sn >= core->cmcf->servers.nelts) {
+ return "Server index out of range";
}
- pcscf = cmcf->servers.elts;
+ pcscf = core->cmcf->servers.elts;
pcscf += sn;
- cscf = *pcscf;
+
+ core->cscf = *pcscf;
/* find application */
- pcacf = cscf->applications.elts;
- cacf = NULL;
+ if (ngx_http_arg(r, (u_char *) "app", sizeof("app") - 1, &app) != NGX_OK) {
+ ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
+ "rtmp_control: app not specified");
+ return "Application not specified";
+ }
- for (n = 0; n < cscf->applications.nelts; ++n, ++pcacf) {
+ core->cacf = NULL;
+
+ pcacf = core->cscf->applications.elts;
+
+ for (n = 0; n < core->cscf->applications.nelts; ++n, ++pcacf) {
if ((*pcacf)->name.len == app.len &&
ngx_strncmp((*pcacf)->name.data, app.data, app.len) == 0)
{
- cacf = *pcacf;
+ core->cacf = *pcacf;
break;
}
}
- if (cacf == NULL) {
- ngx_str_set(&msg, "Application not found");
- goto error;
+ if (core->cacf == NULL) {
+ return "Application not found";
}
- lacf = cacf->app_conf[ngx_rtmp_live_module.ctx_index];
- racf = cacf->app_conf[ngx_rtmp_record_module.ctx_index];
+ return NGX_CONF_OK;
+}
+
+
+static const char *
+ngx_rtmp_control_parse_live(ngx_http_request_t *r,
+ ngx_rtmp_control_core_t *core,
+ ngx_rtmp_control_live_t *live)
+{
+ ngx_str_t name;
+ size_t len;
+
+ ngx_memzero(&name, sizeof(name));
+ ngx_http_arg(r, (u_char *) "name", sizeof("name") - 1, &name);
+
+ live->lacf = core->cacf->app_conf[ngx_rtmp_live_module.ctx_index];
/* find live stream by name */
- for (ls = lacf->streams[ngx_hash_key(name.data, name.len) % lacf->nbuckets];
- ls; ls = ls->next)
+ for (live->ls = live->lacf->streams[ngx_hash_key(name.data, name.len) %
+ live->lacf->nbuckets];
+ live->ls; live->ls = live->ls->next)
{
- len = ngx_strlen(ls->name);
+ len = ngx_strlen(live->ls->name);
- if (name.len == len && ngx_strncmp(name.data, ls->name, name.len)
+ if (name.len == len && ngx_strncmp(name.data, live->ls->name, name.len)
== 0)
{
break;
}
}
- if (ls == NULL) {
- ngx_str_set(&msg, "Live stream not found");
+ if (live->ls == NULL) {
+ return "Live stream not found";
+ }
+
+ return NGX_CONF_OK;
+}
+
+
+/* /record arguments:
+ * srv - server index (optional)
+ * app - application name
+ * name - stream name
+ * rec - recorder name
+ */
+
+
+static ngx_int_t
+ngx_rtmp_control_record(ngx_http_request_t *r, ngx_str_t *method)
+{
+ ngx_rtmp_control_core_t core;
+ ngx_rtmp_control_live_t live;
+ ngx_rtmp_record_app_conf_t *racf;
+ ngx_rtmp_live_ctx_t *lctx;
+ ngx_rtmp_session_t *s;
+ ngx_chain_t cl;
+ ngx_uint_t rn;
+ ngx_str_t rec, path;
+ ngx_buf_t *b;
+ ngx_int_t rc;
+ const char *msg;
+
+ msg = ngx_rtmp_control_parse_core(r, &core);
+ if (msg != NGX_CONF_OK) {
+ goto error;
+ }
+
+ msg = ngx_rtmp_control_parse_live(r, &core, &live);
+ if (msg != NGX_CONF_OK) {
goto error;
}
/* find publisher context */
- for (lctx = ls->ctx; lctx; lctx = lctx->next) {
+ for (lctx = live.ls->ctx; lctx; lctx = lctx->next) {
if (lctx->flags & NGX_RTMP_LIVE_PUBLISHING) {
break;
}
}
if (lctx == NULL) {
- ngx_str_set(&msg, "No publisher");
+ msg = "No publisher";
goto error;
}
s = lctx->session;
/* find recorder */
+ ngx_memzero(&rec, sizeof(rec));
+ ngx_http_arg(r, (u_char *) "rec", sizeof("rec") - 1, &rec);
+
+ racf = core.cacf->app_conf[ngx_rtmp_record_module.ctx_index];
+
rn = ngx_rtmp_record_find(racf, &rec);
if (rn == NGX_CONF_UNSET_UINT) {
- ngx_str_set(&msg, "Recorder not found");
+ msg = "Recorder not found";
goto error;
}
+ /* call the method */
ngx_memzero(&path, sizeof(path));
if (method->len == sizeof("start") - 1 &&
@@ -211,12 +290,12 @@ ngx_rtmp_control_record(ngx_http_request_t *r, ngx_str_t *method)
rc = ngx_rtmp_record_close(s, rn, &path);
} else {
- ngx_str_set(&msg, "Undefined method");
+ msg = "Undefined method";
goto error;
}
if (rc == NGX_ERROR) {
- ngx_str_set(&msg, "Recorder error");
+ msg = "Recorder error";
goto error;
}
@@ -245,25 +324,105 @@ ngx_rtmp_control_record(ngx_http_request_t *r, ngx_str_t *method)
return ngx_http_output_filter(r, &cl);
error:
- r->headers_out.status = NGX_HTTP_BAD_REQUEST;
- r->headers_out.content_length_n = msg.len;
+ return ngx_rtmp_control_output_error(r, msg);
+}
+
+
+static ngx_int_t
+ngx_rtmp_control_drop(ngx_http_request_t *r, ngx_str_t *method)
+{
+ ngx_rtmp_control_core_t core;
+ ngx_rtmp_control_live_t live;
+ ngx_rtmp_live_ctx_t *lctx;
+ ngx_str_t addr, *paddr;
+ const char *msg;
+ ngx_uint_t ndropped;
+ size_t len;
+ u_char *p;
+ ngx_buf_t *b;
+ ngx_chain_t cl;
+
+ msg = ngx_rtmp_control_parse_core(r, &core);
+ if (msg != NGX_CONF_OK) {
+ goto error;
+ }
+
+ msg = ngx_rtmp_control_parse_live(r, &core, &live);
+ if (msg != NGX_CONF_OK) {
+ goto error;
+ }
+
+ ndropped = 0;
+
+ if (method->len == sizeof("publisher") - 1 &&
+ ngx_strncmp(method->data, "publisher", method->len) == 0)
+ {
+ for (lctx = live.ls->ctx; lctx; lctx = lctx->next) {
+ if (lctx->flags & NGX_RTMP_LIVE_PUBLISHING) {
+ ngx_rtmp_finalize_session(lctx->session);
+ ++ndropped;
+ break;
+ }
+ }
+
+ } else if (method->len == sizeof("client") - 1 &&
+ ngx_strncmp(method->data, "client", method->len) == 0)
+ {
+ ngx_memzero(&addr, sizeof(addr));
+ ngx_http_arg(r, (u_char *) "addr", sizeof("addr") - 1, &addr);
+
+ for (lctx = live.ls->ctx; lctx; lctx = lctx->next) {
+ if (addr.len && lctx->session && lctx->session->connection) {
+ paddr = &lctx->session->connection->addr_text;
+ if (paddr->len != addr.len ||
+ ngx_strncmp(paddr->data, addr.data, addr.len))
+ {
+ continue;
+ }
+ }
+
+ ngx_rtmp_finalize_session(lctx->session);
+ ++ndropped;
+ }
+
+ } else {
+ msg = "Undefined method";
+ goto error;
+ }
+
+ /* output ndropped */
+
+ len = NGX_OFF_T_LEN;
+
+ p = ngx_palloc(r->connection->pool, len);
+ if (p == NULL) {
+ return NGX_ERROR;
+ }
+
+ len = (size_t) (ngx_snprintf(p, len, "%ui", ndropped) - p);
+
+ r->headers_out.status = NGX_HTTP_OK;
+ r->headers_out.content_length_n = len;
b = ngx_calloc_buf(r->pool);
if (b == NULL) {
return NGX_ERROR;
}
+ b->start = b->pos = p;
+ b->end = b->last = p + len;
+ b->temporary = 1;
+ b->last_buf = 1;
+
ngx_memzero(&cl, sizeof(cl));
cl.buf = b;
- b->start = b->pos = msg.data;
- b->end = b->last = msg.data + msg.len;
- b->memory = 1;
- b->last_buf = 1;
-
ngx_http_send_header(r);
return ngx_http_output_filter(r, &cl);
+
+error:
+ return ngx_rtmp_control_output_error(r, msg);
}
@@ -315,6 +474,7 @@ ngx_rtmp_control_handler(ngx_http_request_t *r)
}
NGX_RTMP_CONTROL_SECTION(RECORD, record);
+ NGX_RTMP_CONTROL_SECTION(DROP, drop);
#undef NGX_RTMP_CONTROL_SECTION

0 comments on commit f65f07d

Please sign in to comment.