diff --git a/ffdecsa/ffdecsa.c b/ffdecsa/ffdecsa.c index ca6d109..2ca01ff 100644 --- a/ffdecsa/ffdecsa.c +++ b/ffdecsa/ffdecsa.c @@ -63,8 +63,7 @@ #define PARALLEL_MODE PARALLEL_128_SSE2 #elif defined(__mips__) || defined(__mips) || defined(__MIPS__) -//#define PARALLEL_MODE PARALLEL_64_LONG -#define PARALLEL_MODE PARALLEL_32_INT +#define PARALLEL_MODE PARALLEL_64_LONG #elif defined(__sh__) || defined(__SH4__) #define PARALLEL_MODE PARALLEL_32_INT @@ -75,7 +74,7 @@ #ifdef WITH_ARM_NEON #define PARALLEL_MODE PARALLEL_128_NEON #else -#define PARALLEL_MODE PARALLEL_32_INT +#define PARALLEL_MODE PARALLEL_64_LONG #endif #else diff --git a/globals.h b/globals.h index 1cab50c..7af2385 100644 --- a/globals.h +++ b/globals.h @@ -2378,6 +2378,7 @@ struct s_config int32_t emu_stream_relay_port; uint32_t emu_stream_ecm_delay; int8_t emu_stream_relay_enabled; + uint32_t emu_stream_relay_buffer_time; int8_t emu_stream_emm_enabled; CAIDTAB emu_stream_relay_ctab; // use the stream server for these caids #endif diff --git a/module-emulator-streamserver.c b/module-emulator-streamserver.c index e32c244..78599f7 100644 --- a/module-emulator-streamserver.c +++ b/module-emulator-streamserver.c @@ -37,6 +37,7 @@ char *emu_stream_source_auth = NULL; int32_t emu_stream_relay_port = 17999; int8_t emu_stream_emm_enabled = 0; uint32_t cluster_size = 50; +uint32_t suggested_cluster_size; static uint8_t emu_stream_server_mutex_init = 0; static pthread_mutex_t emu_stream_server_mutex; @@ -47,6 +48,11 @@ uint16_t emu_stream_cur_srvid[EMU_STREAM_SERVER_MAX_CONNECTIONS]; int8_t stream_server_has_ecm[EMU_STREAM_SERVER_MAX_CONNECTIONS]; pthread_mutex_t emu_fixed_key_data_mutex[EMU_STREAM_SERVER_MAX_CONNECTIONS]; +pthread_mutex_t emu_send_buffer_mutex[EMU_STREAM_SERVER_MAX_CONNECTIONS]; +uint8_t *send_buffer[EMU_STREAM_SERVER_MAX_CONNECTIONS]; +uint32_t send_buffer_length[EMU_STREAM_SERVER_MAX_CONNECTIONS] = {0}; +uint32_t send_buffer_offset[EMU_STREAM_SERVER_MAX_CONNECTIONS] = {0}; +uint8_t continue_sending[EMU_STREAM_SERVER_MAX_CONNECTIONS] = {0}; emu_stream_client_key_data emu_fixed_key_data[EMU_STREAM_SERVER_MAX_CONNECTIONS]; LLIST *ll_emu_stream_delayed_keys[EMU_STREAM_SERVER_MAX_CONNECTIONS]; @@ -1237,19 +1243,20 @@ static void DescrambleTsPacketsCompel(emu_stream_client_data *data, uint8_t *str } } -static void DescrambleTsPacketsICam(emu_stream_client_data *data, uint8_t *stream_buf, uint32_t bufLength, uint16_t packetSize) +static uint16_t DescrambleTsPacketsICam(emu_stream_client_data *data, uint8_t *stream_buf, uint32_t bufLength, uint16_t packetSize) { - uint8_t *packetCluster[4]; - uint8_t scrambled_packets = 0, scramblingControl; - uint32_t i, tsHeader; - int8_t odd_even = -1, odd_even_count = 1; + uint8_t *packetCluster[3]; + uint16_t retVal = bufLength / packetSize; + +#ifdef MODULE_RADEGAST + uint32_t i; + uint32_t tsHeader; for (i = 0; i < bufLength; i += packetSize) { tsHeader = b2i(4, stream_buf + i); - scramblingControl = (tsHeader & 0xC0) >> 6; -#ifdef MODULE_RADEGAST + uint16_t pid, offset, payloadStart; pid = (tsHeader & 0x1FFF00) >> 8; @@ -1272,30 +1279,8 @@ static void DescrambleTsPacketsICam(emu_stream_client_data *data, uint8_t *strea ParseTsData(0x80, 0xFE, 3, &data->have_ecm_data, data->ecm_data, sizeof(data->ecm_data), &data->ecm_data_pos, payloadStart, stream_buf + i + offset, packetSize - offset, ParseEcmData, data); } -#endif // MODULE_RADEGAST - - if (scramblingControl == 0) - { - continue; - } - - scrambled_packets++; - scramblingControl &= 0x1; - - if (odd_even == -1) - { - odd_even = scramblingControl; - } - - if (odd_even != scramblingControl) - { - odd_even_count++; - odd_even = scramblingControl; - } } - - if (scrambled_packets == 0) - return; +#endif // MODULE_RADEGAST SAFE_MUTEX_LOCK(&emu_fixed_key_data_mutex[data->connid]); @@ -1305,14 +1290,12 @@ static void DescrambleTsPacketsICam(emu_stream_client_data *data, uint8_t *strea packetCluster[1] = stream_buf + bufLength; packetCluster[2] = NULL; - decrypt_packets(emu_fixed_key_data[data->connid].icam_csa_ks, packetCluster); - if (odd_even_count > 1) // odd and even packets together cannot be decrypted in one step - { - decrypt_packets(emu_fixed_key_data[data->connid].icam_csa_ks, packetCluster); - } + retVal = decrypt_packets(emu_fixed_key_data[data->connid].icam_csa_ks, packetCluster); } SAFE_MUTEX_UNLOCK(&emu_fixed_key_data_mutex[data->connid]); + + return retVal; } static int32_t connect_to_stream(char *http_buf, int32_t http_buf_len, char *stream_path) @@ -1393,6 +1376,135 @@ static void stream_client_disconnect(emu_stream_client_conn_data *conndata) NULLFREE(conndata); } +int64_t parse_pcr(const uint8_t *tsPacket) { + if (tsPacket[0] != 0x47) { + return INVALID_PCR; + } + + if (!(tsPacket[3] & 0x20)) { + return INVALID_PCR; + } + + if ((tsPacket[5] & 0x10)) { + uint16_t extension = 0; + uint64_t base = 0; + + base |= (uint64_t)(tsPacket[6])<<25; + base |= (uint64_t)(tsPacket[7])<<(25-8); + base |= (uint64_t)(tsPacket[8])<<(25-8-8); + base |= (uint64_t)(tsPacket[9])<<(25-8-8-8); + base |= (uint16_t)(tsPacket[10] & 128)>>7; + extension |= tsPacket[11]; + extension |= (uint16_t)(tsPacket[10] & 0x01)<<8; + + return base*300 + extension; + } + + return INVALID_PCR; +} + +uint8_t is_scrambled(const uint8_t *tsPacket) { + return tsPacket[3] & 0x80; +} + +uint8_t has_rai_flag(const uint8_t *tsPacket) { + if( ( tsPacket[3] & 0x20 ) && ( tsPacket[4] > 0 ) ) { + return tsPacket[5] & 0x40; + } + return 0; +} + +uint64_t find_first_pcr_value(const uint8_t *tsPackets, const uint32_t numPackets, const uint8_t packetSize) { + uint32_t packetNo; + uint64_t pcr = INVALID_PCR; + for (packetNo=0;packetNo0;) { + pcr = parse_pcr(tsPackets+packetNo*packetSize); + if (pcr != INVALID_PCR) { + return pcr; + } + } + return INVALID_PCR; +} + +uint64_t calc_pcr_timespan(const uint8_t *tsPackets, const uint32_t numPackets, const uint8_t packetSize) { + uint64_t first_pcr_value = find_first_pcr_value(tsPackets, numPackets, packetSize); + uint64_t last_pcr_value = find_last_pcr_value(tsPackets, numPackets, packetSize); + + return last_pcr_value - first_pcr_value + ((last_pcr_valueconnid; + int32_t connfd = ((emu_stream_client_conn_data *) arg)->connfd; + + cs_log("Stream client %i sender started!", connid); + while (continue_sending[connid]) { + // wait until we have enough packets in send buffer + while (continue_sending[connid] && cfg.emu_stream_relay_buffer_time > 0) { + SAFE_MUTEX_LOCK(&emu_send_buffer_mutex[connid]); + uint8_t *currentPacket = NULL, *descrambled_rai_packet = NULL; + uint64_t pcrdiff = 0; + uint32_t packetNo = 0; + for (currentPacket = send_buffer[connid]+send_buffer_offset[connid]; currentPacket<(send_buffer[connid]+send_buffer_length[connid]);currentPacket+=packetSize) { + if (has_rai_flag(currentPacket) && !is_scrambled(currentPacket)) { + descrambled_rai_packet = currentPacket; + break; + } + packetNo++; + } + + if (descrambled_rai_packet != NULL) { + pcrdiff = calc_pcr_timespan(descrambled_rai_packet, (send_buffer[connid]+send_buffer_length[connid]-descrambled_rai_packet) / packetSize, packetSize); + } + SAFE_MUTEX_UNLOCK(&emu_send_buffer_mutex[connid]); + if (pcrdiff > cfg.emu_stream_relay_buffer_time*(PCR_FREQ/1000)) { + cs_log("Stream client %i buffer time reached!", connid); + break; + } else { + } + cs_sleepms(35); // 35 ms is a typical timespan between two PCR packets + } + SAFE_MUTEX_LOCK(&emu_send_buffer_mutex[connid]); + while (continue_sending[connid]) { + uint16_t packetsToSend = (send_buffer_length[connid]-send_buffer_offset[connid]) / packetSize; + if (packetsToSend > 80) { + // limit bitrate to ~30mbit/s + packetsToSend = 80; + } + + ssize_t bytesSent = send(connfd, send_buffer[connid]+send_buffer_offset[connid], packetsToSend*packetSize, 0); + if (bytesSent == -1) { + cs_log_dbg(0, "Stream client %i could not send. Stopping sender!", connid); + continue_sending[connid] = 0; + break; + } else if (bytesSent != packetsToSend*188) { + cs_log("Stream client %i sent only %d bytes", connid, bytesSent); + } + + send_buffer_offset[connid]+=packetsToSend*188; + SAFE_MUTEX_UNLOCK(&emu_send_buffer_mutex[connid]); + cs_sleepms(5); + SAFE_MUTEX_LOCK(&emu_send_buffer_mutex[connid]); + } + SAFE_MUTEX_UNLOCK(&emu_send_buffer_mutex[connid]); + } + cs_log("Stream client %i sender stopped!", connid); +} + static void *stream_client_handler(void *arg) { emu_stream_client_conn_data *conndata = (emu_stream_client_conn_data *)arg; @@ -1410,6 +1522,8 @@ static void *stream_client_handler(void *arg) uint16_t packetCount = 0, packetSize = 0, startOffset = 0; uint32_t remainingDataPos, remainingDataLength, tmp_pids[4]; + pthread_t sender_thread; + struct pollfd pfd[2]; int ret; @@ -1428,10 +1542,19 @@ static void *stream_client_handler(void *arg) return NULL; } + if (!cs_malloc(&send_buffer[conndata->connid], EMU_SEND_BUFFER_SIZE)) + { + NULLFREE(http_buf); + NULLFREE(stream_buf); + stream_client_disconnect(conndata); + return NULL; + } + if (!cs_malloc(&data, sizeof(emu_stream_client_data))) { NULLFREE(http_buf); NULLFREE(stream_buf); + NULLFREE(send_buffer[conndata->connid]); stream_client_disconnect(conndata); return NULL; } @@ -1441,6 +1564,7 @@ static void *stream_client_handler(void *arg) { NULLFREE(http_buf); NULLFREE(stream_buf); + NULLFREE(send_buffer[conndata->connid]); NULLFREE(data); stream_client_disconnect(conndata); return NULL; @@ -1451,6 +1575,7 @@ static void *stream_client_handler(void *arg) { NULLFREE(http_buf); NULLFREE(stream_buf); + NULLFREE(send_buffer[conndata->connid]); NULLFREE(data); stream_client_disconnect(conndata); return NULL; @@ -1485,6 +1610,7 @@ static void *stream_client_handler(void *arg) { NULLFREE(http_buf); NULLFREE(stream_buf); + NULLFREE(send_buffer[conndata->connid]); NULLFREE(data); stream_client_disconnect(conndata); return NULL; @@ -1527,6 +1653,11 @@ static void *stream_client_handler(void *arg) streamStatus = 0; bytesRead = 0; + send_buffer_length[conndata->connid] = 0; + send_buffer_offset[conndata->connid] = 0; + continue_sending[conndata->connid] = 1; + start_thread("emu stream client sender", stream_client_sender, conndata, &sender_thread, 0, 0); + while (!exit_oscam && clientStatus != -1 && streamStatus != -1 && streamConnectErrorCount < 3 && streamDataErrorCount < 15) { @@ -1537,8 +1668,8 @@ static void *stream_client_handler(void *arg) } else if (emu_fixed_key_data[conndata->connid].icam_csa_used) { - cur_dvb_buffer_size = 188 * cluster_size; - cur_dvb_buffer_wait = 188 * (cluster_size - 3); + cur_dvb_buffer_size = 188 * suggested_cluster_size; + cur_dvb_buffer_wait = 188 * suggested_cluster_size; } else { @@ -1546,12 +1677,15 @@ static void *stream_client_handler(void *arg) cur_dvb_buffer_wait = EMU_DVB_BUFFER_WAIT_DES; } + uint8_t packets_left_in_buffer = (packetSize > 0) ? bytesRead/packetSize:0; + uint8_t descrambling = data->have_pat_data == 1 && data->have_pmt_data == 1; + pfd[0].fd = streamfd; pfd[0].events = POLLIN | POLLRDHUP | POLLHUP; pfd[1].fd = conndata->connfd; pfd[1].events = POLLRDHUP | POLLHUP; - ret = poll(pfd, 2, 2000); + ret = poll(pfd, 2, (packets_left_in_buffer > 0 && descrambling) ? 5:2000); if (ret < 0) // poll error { @@ -1562,9 +1696,14 @@ static void *stream_client_handler(void *arg) } else if (ret == 0) // timeout { - cs_log("WARNING: stream client %i no data from stream source", conndata->connid); - streamDataErrorCount++; // 2 sec timeout * 15 = 30 seconds no data -> close - continue; + if (packets_left_in_buffer > 0 && descrambling) { + // was fast timeout + streamStatus = 0; // we timed out, so skip recv() and pretend we received 0 bytes + } else { + cs_log("WARNING: stream client %i no data from stream source", conndata->connid); + streamDataErrorCount++; // 2 sec timeout * 15 = 30 seconds no data -> close + continue; + } } else { @@ -1601,7 +1740,6 @@ static void *stream_client_handler(void *arg) { cs_log_dbg(0, "WARNING: stream client %i non-full buffer from stream source", conndata->connid); streamDataErrorCount++; - cs_sleepms(100); } } else @@ -1612,7 +1750,7 @@ static void *stream_client_handler(void *arg) streamConnectErrorCount = 0; bytesRead += streamStatus; - if (bytesRead >= cur_dvb_buffer_wait) + if (bytesRead >= cur_dvb_buffer_wait || (packets_left_in_buffer > 0 && descrambling)) { startOffset = 0; @@ -1622,9 +1760,9 @@ static void *stream_client_handler(void *arg) SearchTsPackets(stream_buf, bytesRead, &packetSize, &startOffset); } - if (packetSize == 0) + if (packetSize == 0) // no ts packets found yet { - bytesRead = 0; + bytesRead = 0; // discard buffer } else { @@ -1649,7 +1787,8 @@ static void *stream_client_handler(void *arg) } else if (caid_is_icam(data->caid)) //ICAM { - DescrambleTsPacketsICam(data, stream_buf + startOffset, packetCount * packetSize, packetSize); + packetCount = DescrambleTsPacketsICam(data, stream_buf + startOffset, packetCount * packetSize, packetSize); + cs_sleepus(1500); } } else @@ -1663,7 +1802,18 @@ static void *stream_client_handler(void *arg) ParseTsPackets(data, stream_buf + startOffset, packetCount * packetSize, packetSize); } - clientStatus = send(conndata->connfd, stream_buf + startOffset, packetCount * packetSize, 0); + SAFE_MUTEX_LOCK(&emu_send_buffer_mutex[conndata->connid]); + if (send_buffer_length[conndata->connid] + packetCount * packetSize > EMU_SEND_BUFFER_SIZE) { + // reorganize sendbuffer + memmove(send_buffer[conndata->connid], send_buffer[conndata->connid] + send_buffer_offset[conndata->connid], send_buffer_length[conndata->connid] - send_buffer_offset[conndata->connid]); + send_buffer_length[conndata->connid] = send_buffer_length[conndata->connid] - send_buffer_offset[conndata->connid]; + send_buffer_offset[conndata->connid] = 0; + } + memcpy(send_buffer[conndata->connid] + send_buffer_length[conndata->connid], stream_buf + startOffset, packetCount * packetSize); + send_buffer_length[conndata->connid] += packetCount * packetSize; + SAFE_MUTEX_UNLOCK(&emu_send_buffer_mutex[conndata->connid]); + + clientStatus = packetCount * packetSize; remainingDataPos = startOffset + (packetCount * packetSize); remainingDataLength = bytesRead - remainingDataPos; @@ -1685,8 +1835,13 @@ static void *stream_client_handler(void *arg) close(streamfd); } + continue_sending[conndata->connid] = 0; + // wait for sender thread to finish + pthread_join(sender_thread, NULL); + NULLFREE(http_buf); NULLFREE(stream_buf); + NULLFREE(send_buffer[conndata->connid]); for (i = 0; i < 8; i++) { @@ -1695,10 +1850,6 @@ static void *stream_client_handler(void *arg) free_key_struct(data->key.pvu_csa_ks[i]); } } - if (data->key.icam_csa_ks) - { - free_key_struct(data->key.icam_csa_ks); - } #ifdef MODULE_RADEGAST icam_reset(data->connid); #endif @@ -1717,7 +1868,8 @@ void *stream_server(void *UNUSED(a)) emu_stream_client_conn_data *conndata; cluster_size = get_internal_parallelism(); - cs_log("INFO: FFDecsa parallel mode = %d", cluster_size); + suggested_cluster_size = get_suggested_cluster_size(); + cs_log("INFO: FFDecsa parallel mode = %d (suggested cluster size: %d packets)", cluster_size, suggested_cluster_size); if (!emu_stream_server_mutex_init) { diff --git a/module-emulator-streamserver.h b/module-emulator-streamserver.h index 5b76d0d..580e699 100644 --- a/module-emulator-streamserver.h +++ b/module-emulator-streamserver.h @@ -13,6 +13,15 @@ #define EMU_DVB_BUFFER_WAIT_DES 188*29 #define EMU_DVB_BUFFER_SIZE EMU_DVB_BUFFER_SIZE_CSA +#define EMU_SEND_BUFFER_SIZE 188*204*208*2 // might implement ring buffer in future + +#define INVALID_PCR (uint64_t) 0xFFFFFFFFFFFFFFFF +#define PCR_FREQ 27000000 +#define PCR_SCALE ((uint64_t)1 << 33) * 300 + +// miliseconds of mpeg-ts to buffer +#define INITIAL_BUFFER_TIME_MS 2500 + typedef struct { uint32_t pvu_des_ks[8][2][32]; @@ -85,6 +94,8 @@ extern pthread_mutex_t emu_fixed_key_data_mutex[EMU_STREAM_SERVER_MAX_CONNECTION extern emu_stream_client_key_data emu_fixed_key_data[EMU_STREAM_SERVER_MAX_CONNECTIONS]; extern LLIST *ll_emu_stream_delayed_keys[EMU_STREAM_SERVER_MAX_CONNECTIONS]; +extern pthread_mutex_t emu_send_buffer_mutex[EMU_STREAM_SERVER_MAX_CONNECTIONS]; + void *stream_key_delayer(void *arg); bool stream_write_cw(ECM_REQUEST *er); diff --git a/module-emulator.c b/module-emulator.c index 0cdb915..ce6630e 100644 --- a/module-emulator.c +++ b/module-emulator.c @@ -735,6 +735,7 @@ static int32_t emu_reader_init(struct s_reader *UNUSED(reader)) for (i = 0; i < EMU_STREAM_SERVER_MAX_CONNECTIONS; i++) { SAFE_MUTEX_INIT(&emu_fixed_key_data_mutex[i], NULL); + SAFE_MUTEX_INIT(&emu_send_buffer_mutex[i], NULL); ll_emu_stream_delayed_keys[i] = ll_create("ll_emu_stream_delayed_keys"); memset(&emu_fixed_key_data[i], 0, sizeof(emu_stream_client_key_data)); } diff --git a/module-webif.c b/module-webif.c index 9ecb13e..516e994 100644 --- a/module-webif.c +++ b/module-webif.c @@ -1315,6 +1315,7 @@ static char *send_oscam_config_streamrelay(struct templatevars *vars, struct uri { tpl_printf(vars, TPLADD, "STREAM_SOURCE_AUTH_PASSWORD", "%s", cfg.emu_stream_source_auth_password); } tpl_printf(vars, TPLADD, "STREAM_RELAY_PORT", "%d", cfg.emu_stream_relay_port); tpl_printf(vars, TPLADD, "STREAM_ECM_DELAY", "%d", cfg.emu_stream_ecm_delay); + tpl_printf(vars, TPLADD, "STREAM_RELAY_BUFFER_TIME", "%d", cfg.emu_stream_relay_buffer_time); tpl_printf(vars, TPLADD, "TMP", "STREAMRELAYENABLEDSELECTED%d", cfg.emu_stream_relay_enabled); tpl_addVar(vars, TPLADD, tpl_getVar(vars, "TMP"), "selected"); diff --git a/oscam-config-global.c b/oscam-config-global.c index 164601c..bf480ff 100644 --- a/oscam-config-global.c +++ b/oscam-config-global.c @@ -925,6 +925,7 @@ static const struct config_list streamrelay_opts[] = DEF_OPT_INT32("stream_relay_port" , OFS(emu_stream_relay_port), 17999), DEF_OPT_UINT32("stream_ecm_delay" , OFS(emu_stream_ecm_delay), 600), DEF_OPT_INT8("stream_relay_enabled" , OFS(emu_stream_relay_enabled), 1), + DEF_OPT_UINT32("stream_relay_buffer_time" , OFS(emu_stream_relay_buffer_time), 2000), DEF_OPT_INT8("stream_emm_enabled" , OFS(emu_stream_emm_enabled), 1), DEF_OPT_FUNC("stream_relay_ctab" , OFS(emu_stream_relay_ctab), check_caidtab_fn), DEF_LAST_OPT diff --git a/webif/config/streamrelay.html b/webif/config/streamrelay.html index 6fe5387..c9ec6bb 100644 --- a/webif/config/streamrelay.html +++ b/webif/config/streamrelay.html @@ -15,6 +15,7 @@ Source Stream User: Source Stream Password: Relay Port: + Relay buffer time: ECM fix delay: Process EMM from stream: