Permalink
Browse files

http: Let full mux dump use the normal straeming code

Fix some locking issues in the http streaming code
  • Loading branch information...
1 parent 183aeb6 commit ed36bc1334b48db0c65d1194db560eab6c019595 @andoma committed Oct 25, 2012
Showing with 29 additions and 111 deletions.
  1. +13 −12 src/muxer_pass.c
  2. +16 −99 src/webui/webui.c
View
@@ -107,7 +107,8 @@ pass_muxer_reconfigure(muxer_t* m, const struct streaming_start *ss)
pass_muxer_t *pm = (pass_muxer_t*)m;
const source_info_t *si = &ss->ss_si;
- if(si->si_type == S_MPEG_TS) {
+ if(si->si_type == S_MPEG_TS && ss->ss_pmt_pid) {
+ pm->pm_pat = realloc(pm->pm_pat, 188);
memset(pm->pm_pat, 0xff, 188);
pm->pm_pat[0] = 0x47;
pm->pm_pat[1] = 0x40;
@@ -120,6 +121,7 @@ pass_muxer_reconfigure(muxer_t* m, const struct streaming_start *ss)
return -1;
}
+ pm->pm_pmt = realloc(pm->pm_pmt, 188);
memset(pm->pm_pmt, 0xff, 188);
pm->pm_pmt[0] = 0x47;
pm->pm_pmt[1] = 0x40 | (ss->ss_pmt_pid >> 8);
@@ -216,14 +218,16 @@ pass_muxer_write_ts(muxer_t *m, pktbuf_t *pb)
pass_muxer_t *pm = (pass_muxer_t*)m;
int rem;
- // Inject pmt and pat into the stream
- rem = pm->pm_pc % TS_INJECTION_RATE;
- if(!rem) {
- pm->pm_pat[3] = (pm->pm_pat[3] & 0xf0) | (pm->pm_ic & 0x0f);
- pm->pm_pmt[3] = (pm->pm_pat[3] & 0xf0) | (pm->pm_ic & 0x0f);
- pass_muxer_write(m, pm->pm_pmt, 188);
- pass_muxer_write(m, pm->pm_pat, 188);
- pm->pm_ic++;
+ if(pm->pm_pat != NULL) {
+ // Inject pmt and pat into the stream
+ rem = pm->pm_pc % TS_INJECTION_RATE;
+ if(!rem) {
+ pm->pm_pat[3] = (pm->pm_pat[3] & 0xf0) | (pm->pm_ic & 0x0f);
+ pm->pm_pmt[3] = (pm->pm_pat[3] & 0xf0) | (pm->pm_ic & 0x0f);
+ pass_muxer_write(m, pm->pm_pmt, 188);
+ pass_muxer_write(m, pm->pm_pat, 188);
+ pm->pm_ic++;
+ }
}
pass_muxer_write(m, pb->pb_data, pb->pb_size);
@@ -331,9 +335,6 @@ pass_muxer_create(muxer_container_type_t mc)
pm->m_close = pass_muxer_close;
pm->m_destroy = pass_muxer_destroy;
- pm->pm_pat = malloc(188);
- pm->pm_pmt = malloc(188);
-
return (muxer_t *)pm;
}
View
@@ -141,8 +141,8 @@ page_static_file(http_connection_t *hc, const char *remain, void *opaque)
* HTTP stream loop
*/
static void
-http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
- th_subscription_t *s, muxer_container_type_t mc)
+http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
+ const char *name, muxer_container_type_t mc)
{
streaming_message_t *sm;
int run = 1;
@@ -153,17 +153,11 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
struct timeval tp;
int err = 0;
socklen_t errlen = sizeof(err);
- const char *name;
mux = muxer_create(mc);
if(muxer_open_stream(mux, hc->hc_fd))
run = 0;
- if(s->ths_channel)
- name = s->ths_channel->ch_name;
- else
- name = "Live Stream";
-
/* reduce timeout on write() for streaming */
tp.tv_sec = 5;
tp.tv_usec = 0;
@@ -265,76 +259,6 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
}
-
-/**
- * HTTP stream loop
- */
-static void
-http_stream_run2(http_connection_t *hc, streaming_queue_t *sq)
-{
- streaming_message_t *sm;
- int run = 1;
- int timeouts = 0;
- struct timespec ts;
- struct timeval tp;
- int err = 0;
- socklen_t errlen = sizeof(err);
-
- /* reduce timeout on write() for streaming */
- tp.tv_sec = 5;
- tp.tv_usec = 0;
- setsockopt(hc->hc_fd, SOL_SOCKET, SO_SNDTIMEO, &tp, sizeof(tp));
- http_output_content(hc, "application/octet-stream");
-
- while(run) {
- pthread_mutex_lock(&sq->sq_mutex);
- sm = TAILQ_FIRST(&sq->sq_queue);
- if(sm == NULL) {
- gettimeofday(&tp, NULL);
- ts.tv_sec = tp.tv_sec + 1;
- ts.tv_nsec = tp.tv_usec * 1000;
-
- if(pthread_cond_timedwait(&sq->sq_cond, &sq->sq_mutex, &ts) == ETIMEDOUT) {
- timeouts++;
-
- //Check socket status
- getsockopt(hc->hc_fd, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen);
- if(err) {
- tvhlog(LOG_DEBUG, "webui", "Stop streaming %s, client hung up", hc->hc_url_orig);
- run = 0;
- }else if(timeouts >= 20) {
- tvhlog(LOG_WARNING, "webui", "Stop streaming %s, timeout waiting for packets", hc->hc_url_orig);
- run = 0;
- }
- }
- pthread_mutex_unlock(&sq->sq_mutex);
- continue;
- }
-
- timeouts = 0; //Reset timeout counter
- TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
- pthread_mutex_unlock(&sq->sq_mutex);
-
- pktbuf_t *pb;
-
- switch(sm->sm_type) {
- case SMT_MPEGTS:
- pb = sm->sm_data;
- if(write(hc->hc_fd, pb->pb_data, pb->pb_size) != pb->pb_size) {
- tvhlog(LOG_DEBUG, "webui", "Write error %s, stopping", hc->hc_url_orig);
- run = 0;
- }
- break;
- default:
- break;
- }
-
- streaming_msg_free(sm);
- }
-}
-
-
-
/**
* Output a playlist containing a single channel
*/
@@ -617,7 +541,8 @@ http_stream_service(http_connection_t *hc, service_t *service)
muxer_container_type_t mc;
int flags;
const char *str;
- size_t qsize ;
+ size_t qsize;
+ const char *name;
mc = muxer_container_txt2type(http_arg_get(&hc->hc_req_args, "mux"));
if(mc == MC_UNKNOWN) {
@@ -644,15 +569,14 @@ http_stream_service(http_connection_t *hc, service_t *service)
flags = 0;
}
- pthread_mutex_lock(&global_lock);
s = subscription_create_from_service(service, "HTTP", st, flags);
- pthread_mutex_unlock(&global_lock);
-
if(s) {
- http_stream_run(hc, &sq, s, mc);
+ name = strdupa(service->s_ch ?
+ service->s_ch->ch_name : service->s_nicename);
+ pthread_mutex_unlock(&global_lock);
+ http_stream_run(hc, &sq, name, mc);
pthread_mutex_lock(&global_lock);
subscription_unsubscribe(s);
- pthread_mutex_unlock(&global_lock);
}
if(gh)
@@ -675,19 +599,15 @@ http_stream_tdmi(http_connection_t *hc, th_dvb_mux_instance_t *tdmi)
{
th_subscription_t *s;
streaming_queue_t sq;
-
+ const char *name;
streaming_queue_init(&sq, SMT_PACKET);
- pthread_mutex_lock(&global_lock);
s = dvb_subscription_create_from_tdmi(tdmi, "HTTP", &sq.sq_st);
+ name = strdupa(tdmi->tdmi_identifier);
pthread_mutex_unlock(&global_lock);
-
- http_stream_run2(hc, &sq);
-
-
+ http_stream_run(hc, &sq, name, MC_PASS);
pthread_mutex_lock(&global_lock);
subscription_unsubscribe(s);
- pthread_mutex_unlock(&global_lock);
streaming_queue_deinit(&sq);
@@ -712,6 +632,7 @@ http_stream_channel(http_connection_t *hc, channel_t *ch)
muxer_container_type_t mc;
char *str;
size_t qsize;
+ const char *name;
mc = muxer_container_txt2type(http_arg_get(&hc->hc_req_args, "mux"));
if(mc == MC_UNKNOWN) {
@@ -738,16 +659,15 @@ http_stream_channel(http_connection_t *hc, channel_t *ch)
flags = 0;
}
- pthread_mutex_lock(&global_lock);
s = subscription_create_from_channel(ch, priority, "HTTP", st, flags,
NULL, NULL, NULL);
- pthread_mutex_unlock(&global_lock);
if(s) {
- http_stream_run(hc, &sq, s, mc);
+ name = strdupa(ch->ch_name);
+ pthread_mutex_unlock(&global_lock);
+ http_stream_run(hc, &sq, name, mc);
pthread_mutex_lock(&global_lock);
subscription_unsubscribe(s);
- pthread_mutex_unlock(&global_lock);
}
if(gh)
@@ -789,7 +709,7 @@ http_stream(http_connection_t *hc, const char *remain, void *opaque)
http_deescape(components[1]);
- pthread_mutex_lock(&global_lock);
+ scopedgloballock();
if(!strcmp(components[0], "channelid")) {
ch = channel_find_by_identifier(atoi(components[1]));
@@ -801,9 +721,6 @@ http_stream(http_connection_t *hc, const char *remain, void *opaque)
tdmi = dvb_mux_find_by_identifier(components[1]);
}
- // bug here: We can't retain pointers to channels etc outside global_lock
- pthread_mutex_unlock(&global_lock);
-
if(ch != NULL) {
return http_stream_channel(hc, ch);
} else if(service != NULL) {

0 comments on commit ed36bc1

Please sign in to comment.