diff --git a/.gitignore b/.gitignore index b162af25b..366dd988e 100644 --- a/.gitignore +++ b/.gitignore @@ -20,4 +20,4 @@ temp/ *.iml .idea -.fleet \ No newline at end of file +.fleet diff --git a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java index c8a786a5e..e2a33a58e 100644 --- a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java +++ b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java @@ -100,6 +100,9 @@ public class AntMediaApplicationAdapter extends MultiThreadedApplicationAdapter public static final String HOOK_ACTION_PUBLISH_TIMEOUT_ERROR = "publishTimeoutError"; public static final String HOOK_ACTION_ENCODER_NOT_OPENED_ERROR = "encoderNotOpenedError"; public static final String HOOK_ACTION_ENDPOINT_FAILED = "endpointFailed"; + public static final String HOOK_ACTION_START_PLAY = "playStart"; + public static final String HOOK_ACTION_STOP_PLAY = "playStop"; + public static final String HOOK_ACTION_START_RECORD = "recordStart"; public static final String STREAMS = "streams"; @@ -499,7 +502,8 @@ public void closeBroadcast(String streamId) { final String category = broadcast.getCategory(); final String metaData = broadcast.getMetaData(); logger.info("Setting timer to call live stream ended hook for stream:{}",streamId ); - vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_END_LIVE_STREAM, name, category, null, null, metaData)); + vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_END_LIVE_STREAM, name, category, null, null, null, null, null, metaData)); + } if (broadcast.isZombi()) { @@ -562,13 +566,67 @@ public void resetDASHStats(String streamId) { } } + public void sendStartPlayWebHook(final String streamId, final String viewerId, final String token, final String viewerType){ + final Broadcast broadcast = getDataStore().get(streamId); + if(broadcast == null){ + return; + } + final String listenerHookURL = getListenerHookURL(broadcast); + if (listenerHookURL == null || listenerHookURL.isEmpty()) { + return; + } + sendWebHook(broadcast, listenerHookURL, viewerId, viewerType, token, HOOK_ACTION_START_PLAY); + } + + public void sendStopPlayWebHook(final String streamId, final String viewerId, final String token, final String viewerType){ + final Broadcast broadcast = getDataStore().get(streamId); + if(broadcast == null){ + return; + } + final String listenerHookURL = getListenerHookURL(broadcast); + if (listenerHookURL == null || listenerHookURL.isEmpty()) { + return; + } + sendWebHook(broadcast, listenerHookURL, viewerId, viewerType, token, HOOK_ACTION_STOP_PLAY); + } + + public void sendStartRecordWebHook(final String streamId){ + final Broadcast broadcast = getDataStore().get(streamId); + if(broadcast == null){ + return; + } + final String listenerHookURL = getListenerHookURL(broadcast); + if (listenerHookURL == null || listenerHookURL.isEmpty()) { + return; + } + sendWebHook(broadcast, listenerHookURL, null, null,null, HOOK_ACTION_START_RECORD); + } + + private void sendWebHook(final Broadcast broadcast, final String listenerHookUrl, final String viewerId, final String viewerType, final String token, final String hookAction){ + final String streamId = broadcast.getStreamId(); + final String name = broadcast.getName(); + final String category = broadcast.getCategory(); + final String metaData = broadcast.getMetaData(); + logger.info("Setting timer to call {} hook for stream:{}", hookAction, streamId); + vertx.setTimer(10, e -> notifyHook(listenerHookUrl, streamId, hookAction, name, category, + null, null, viewerId, viewerType, token, metaData)); + } + + private String getRtmpViewerId(){ + return "rtmp_" + RandomStringUtils.randomNumeric(8); + } + @Override public void streamPlayItemPlay(ISubscriberStream stream, IPlayItem item, boolean isLive) { - vertx.setTimer(1, l -> getDataStore().updateRtmpViewerCount(item.getName(), true)); + final String streamId = item.getName(); + sendStartPlayWebHook(streamId, getRtmpViewerId(), null, ViewerStats.RTMP_TYPE); + vertx.setTimer(1, l -> getDataStore().updateRtmpViewerCount(streamId, true)); } @Override public void streamPlayItemStop(ISubscriberStream stream, IPlayItem item) { - vertx.setTimer(1, l -> getDataStore().updateRtmpViewerCount(item.getName(), false)); + final String streamId = item.getName(); + sendStopPlayWebHook(streamId, getRtmpViewerId(), null, ViewerStats.RTMP_TYPE); + vertx.setTimer(1, l -> getDataStore().updateRtmpViewerCount(streamId, false)); } @Override @@ -580,7 +638,6 @@ public void streamSubscriberClose(ISubscriberStream stream) { public void startPublish(String streamId, long absoluteStartTimeMs, String publishType) { vertx.executeBlocking( handler -> { try { - Broadcast broadcast = updateBroadcastStatus(streamId, absoluteStartTimeMs, publishType, getDataStore().get(streamId)); final String listenerHookURL = getListenerHookURL(broadcast); @@ -591,7 +648,13 @@ public void startPublish(String streamId, long absoluteStartTimeMs, String publi final String metaData = broadcast.getMetaData(); logger.info("Setting timer to call live stream started hook for stream:{}",streamId ); vertx.setTimer(10, e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_START_LIVE_STREAM, name, category, - null, null, metaData)); + null, null, null, null, null, metaData)); + } + + if ((broadcast.getMp4Enabled() == MuxAdaptor.RECORDING_ENABLED_FOR_STREAM || broadcast.getWebMEnabled() == MuxAdaptor.RECORDING_ENABLED_FOR_STREAM) + || (appSettings.isMp4MuxingEnabled() || appSettings.isWebMMuxingEnabled()) + ) { + sendStartRecordWebHook(streamId); } int ingestingStreamLimit = appSettings.getIngestingStreamLimit(); @@ -760,7 +823,8 @@ public void muxingFinished(final String streamId, File file, long startTime, lon final String metaData = (broadcast != null) ? broadcast.getMetaData() : null; String finalListenerHookURL = listenerHookURL; logger.info("Setting timer for calling vod ready hook for stream:{}", streamId); - vertx.runOnContext(e -> notifyHook(finalListenerHookURL, streamId, HOOK_ACTION_VOD_READY, null, null, baseName, vodIdFinal, metaData)); + vertx.runOnContext(e -> notifyHook(finalListenerHookURL, streamId, HOOK_ACTION_VOD_READY, null, null, baseName, vodIdFinal, null, null, null, metaData)); + } String muxerFinishScript = appSettings.getMuxerFinishScript(); @@ -805,33 +869,24 @@ public static String getRelativePath(String filePath){ /** * Notify hook with parameters below - * - * @param url - * is the url of the service to be called - * - * @param id - * is the stream id that is unique for each stream - * - * @param action - * is the name of the action to be notified, it has values such - * as {@link #HOOK_ACTION_END_LIVE_STREAM} - * {@link #HOOK_ACTION_START_LIVE_STREAM} - * - * @param streamName, - * name of the stream. It is not the name of the file. It is just - * a user friendly name - * - * @param category, - * category of the stream - * - * @param vodName name of the vod - * - * @param vodId id of the vod in the datastore - * + * + * @param streamName, name of the stream. It is not the name of the file. It is just + * a user friendly name + * @param category, category of the stream + * @param url is the url of the service to be called + * @param id is the stream id that is unique for each stream + * @param action is the name of the action to be notified, it has values such + * as {@link #HOOK_ACTION_END_LIVE_STREAM} + * {@link #HOOK_ACTION_START_LIVE_STREAM} + * @param vodName name of the vod + * @param vodId id of the vod in the datastore + * @param viewerId session id or subscriber id + * @param viewerType + * @param token play token * @return */ public StringBuilder notifyHook(String url, String id, String action, String streamName, String category, - String vodName, String vodId, String metadata) { + String vodName, String vodId, String viewerId, String viewerType, String token, String metadata) { StringBuilder response = null; logger.info("Running notify hook url:{} stream id: {} action:{} vod name:{} vod id:{}", url, id, action, vodName, vodId); if (url != null && url.length() > 0) { @@ -854,7 +909,19 @@ public StringBuilder notifyHook(String url, String id, String action, String str variables.put("vodId", vodId); } - if (metadata != null) { + if(viewerId != null){ + variables.put("viewerId", viewerId); + } + + if(viewerType != null){ + variables.put("viewerType", viewerType); + } + + if(token !=null){ + variables.put("token",token); + } + + if (metadata != null && !metadata.isEmpty()) { variables.put("metadata", metadata); } @@ -869,7 +936,6 @@ public StringBuilder notifyHook(String url, String id, String action, String str } public StringBuilder sendPOST(String url, Map variables) throws IOException { - StringBuilder response = null; try (CloseableHttpClient httpClient = getHttpClient()) { @@ -1352,7 +1418,8 @@ public synchronized void incrementEncoderNotOpenedError(String streamId) { final String category = broadcast.getCategory(); final String metaData = broadcast.getMetaData(); logger.info("Setting timer to call encoder not opened error for stream:{}", streamId); - vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_ENCODER_NOT_OPENED_ERROR, name, category, null, null, metaData)); + vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_ENCODER_NOT_OPENED_ERROR, name, category, null, null, null, null, null, metaData)); + } } } @@ -1380,8 +1447,8 @@ public synchronized void publishTimeoutError(String streamId, String subscriberI JSONObject jsonResponse = new JSONObject(); jsonResponse.put(WebSocketConstants.SUBSCRIBER_ID, subscriberId); - - vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_PUBLISH_TIMEOUT_ERROR, name, category, null, null, jsonResponse.toJSONString())); + + vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_PUBLISH_TIMEOUT_ERROR, name, category, null, null, null, null, null, jsonResponse.toJSONString())); } } } @@ -1493,7 +1560,6 @@ private boolean isEncoderSettingsValid(List encoderSettingsList /** * * @param newSettings - * @param checkUpdateTime * @return true if timing is valid, false if it is invalid */ public boolean isIncomingTimeValid(AppSettings newSettings) @@ -1681,7 +1747,7 @@ public void endpointFailedUpdate(String streamId, String url) { logger.info("Setting timer to call rtmp endpoint failed hook for stream:{}", streamId); JSONObject jsonObject = new JSONObject(); jsonObject.put("rtmp-url", url); - vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_ENDPOINT_FAILED, name, category, null, null, jsonObject.toJSONString())); + vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_ENDPOINT_FAILED, name, category, null, null, null, null, null, jsonObject.toJSONString())); } } } @@ -1767,6 +1833,7 @@ public boolean stopPlaying(String viewerId) { return false; } + public boolean stopPlayingBySubscriberId(String subscriberId){ return false; } diff --git a/src/main/java/io/antmedia/console/rest/AuthenticationFilter.java b/src/main/java/io/antmedia/console/rest/AuthenticationFilter.java index fef2abd9c..6298546d6 100644 --- a/src/main/java/io/antmedia/console/rest/AuthenticationFilter.java +++ b/src/main/java/io/antmedia/console/rest/AuthenticationFilter.java @@ -116,9 +116,9 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha if (serverSettings != null && serverSettings.isJwtServerControlEnabled() && jwtToken != null) - { - if(checkJWT(jwtToken)) { + if(checkJWTServerSettings(jwtToken)) { + chain.doFilter(request, response); } else { @@ -247,7 +247,7 @@ private boolean scopeAccessGranted(String userScope, String dispatchUrl){ } - private boolean checkJWT( String jwtString) { + private boolean checkJWTServerSettings(String jwtString) { boolean result = true; try { diff --git a/src/main/java/io/antmedia/datastore/db/types/WebRTCViewerInfo.java b/src/main/java/io/antmedia/datastore/db/types/WebRTCViewerInfo.java index 44260e053..118966224 100644 --- a/src/main/java/io/antmedia/datastore/db/types/WebRTCViewerInfo.java +++ b/src/main/java/io/antmedia/datastore/db/types/WebRTCViewerInfo.java @@ -40,7 +40,12 @@ public class WebRTCViewerInfo { @ApiModelProperty(value = "IP address of the edge to which viewer is connected") private String edgeAddress; - + /** + * Play token of the viewer + */ + @ApiModelProperty(value = "the token of the viewer") + private String tokenId; + public ObjectId getDbId() { return dbId; } @@ -72,5 +77,12 @@ public String getEdgeAddress() { public void setEdgeAddress(String edgeAddress) { this.edgeAddress = edgeAddress; } - + + public String getTokenId() { + return tokenId; + } + + public void setTokenId(String tokenId) { + this.tokenId = tokenId; + } } diff --git a/src/main/java/io/antmedia/filter/AbstractFilter.java b/src/main/java/io/antmedia/filter/AbstractFilter.java index c1f08c4f5..04cdbc031 100644 --- a/src/main/java/io/antmedia/filter/AbstractFilter.java +++ b/src/main/java/io/antmedia/filter/AbstractFilter.java @@ -9,6 +9,7 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; +import io.antmedia.AntMediaApplicationAdapter; import io.antmedia.security.ITokenService; import org.apache.catalina.util.NetMask; import org.slf4j.Logger; @@ -19,7 +20,6 @@ import io.antmedia.AppSettings; import io.antmedia.datastore.db.DataStore; -import io.antmedia.datastore.db.DataStoreFactory; import io.antmedia.datastore.db.IDataStoreFactory; import io.antmedia.datastore.db.types.Broadcast; import io.antmedia.settings.ServerSettings; @@ -165,6 +165,16 @@ public Broadcast getBroadcast(HttpServletRequest request, String streamId) { } return broadcast; } + + protected AntMediaApplicationAdapter getAntMediaApplicationAdapter() { + AntMediaApplicationAdapter antMediaApplicationAdapter = null; + ApplicationContext context = getAppContext(); + if (context != null) { + antMediaApplicationAdapter = (AntMediaApplicationAdapter) context.getBean(AntMediaApplicationAdapter.BEAN_NAME); + } + return antMediaApplicationAdapter; + } + public DataStore getDataStore(){ ConfigurableWebApplicationContext appContext = getWebApplicationContext(); if (appContext != null && appContext.isRunning()) diff --git a/src/main/java/io/antmedia/filter/DashStatisticsFilter.java b/src/main/java/io/antmedia/filter/DashStatisticsFilter.java index 2ca961930..30fe8b5df 100644 --- a/src/main/java/io/antmedia/filter/DashStatisticsFilter.java +++ b/src/main/java/io/antmedia/filter/DashStatisticsFilter.java @@ -10,6 +10,7 @@ import javax.servlet.http.HttpServletResponse; import javax.ws.rs.HttpMethod; +import io.antmedia.statistic.ViewerStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,8 +25,6 @@ public class DashStatisticsFilter extends AbstractFilter { @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { - - HttpServletRequest httpRequest =(HttpServletRequest)request; String method = httpRequest.getMethod(); @@ -52,8 +51,12 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha logger.debug("req ip {} session id {} stream id {} status {}", request.getRemoteHost(), sessionId, streamId, status); IStreamStats stats = getStreamStats(DashViewerStats.BEAN_NAME); if (stats != null) { - stats.registerNewViewer(streamId, sessionId, subscriberId); - + String tokenId = httpRequest.getParameter("token"); + if(tokenId != null && tokenId.equals("undefined")){ + tokenId = null; + } + stats.registerNewViewer(streamId, sessionId, subscriberId, ViewerStats.DASH_TYPE, tokenId, getAntMediaApplicationAdapter()); + } } } diff --git a/src/main/java/io/antmedia/filter/HlsStatisticsFilter.java b/src/main/java/io/antmedia/filter/HlsStatisticsFilter.java index 954727e40..9989397a2 100644 --- a/src/main/java/io/antmedia/filter/HlsStatisticsFilter.java +++ b/src/main/java/io/antmedia/filter/HlsStatisticsFilter.java @@ -10,6 +10,7 @@ import javax.servlet.http.HttpServletResponse; import javax.ws.rs.HttpMethod; +import io.antmedia.statistic.ViewerStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,8 +48,12 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha logger.debug("req ip {} session id {} stream id {} status {}", request.getRemoteHost(), sessionId, streamId, status); IStreamStats stats = getStreamStats(HlsViewerStats.BEAN_NAME); if (stats != null) { - stats.registerNewViewer(streamId, sessionId, subscriberId); - + String tokenId = httpRequest.getParameter("token"); + if(tokenId != null && tokenId.equals("undefined")){ + tokenId = null; + } + stats.registerNewViewer(streamId, sessionId, subscriberId, ViewerStats.HLS_TYPE, tokenId, getAntMediaApplicationAdapter()); + } } } diff --git a/src/main/java/io/antmedia/filter/JWTFilter.java b/src/main/java/io/antmedia/filter/JWTFilter.java index 6a1be36ab..e27f81f83 100644 --- a/src/main/java/io/antmedia/filter/JWTFilter.java +++ b/src/main/java/io/antmedia/filter/JWTFilter.java @@ -51,6 +51,7 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha } private boolean checkJWT(String jwtString) { + boolean result = false; String jwksURL = appSettings.getJwksURL(); diff --git a/src/main/java/io/antmedia/muxer/MuxAdaptor.java b/src/main/java/io/antmedia/muxer/MuxAdaptor.java index d25c879a7..35b1ef918 100644 --- a/src/main/java/io/antmedia/muxer/MuxAdaptor.java +++ b/src/main/java/io/antmedia/muxer/MuxAdaptor.java @@ -1752,7 +1752,6 @@ public RecordMuxer startRecording(RecordType recordType, int resolutionHeight) { return null; } - RecordMuxer muxer = null; if(recordType == RecordType.MP4) { Mp4Muxer mp4Muxer = createMp4Muxer(); @@ -1917,10 +1916,14 @@ public Result startRtmpStreaming(String rtmpUrl, int resolutionHeight) } public void sendEndpointErrorNotifyHook(String url){ + AntMediaApplicationAdapter adaptor = getAntMediaApplicationAdaptor(); + adaptor.endpointFailedUpdate(this.streamId, url); + } + + protected AntMediaApplicationAdapter getAntMediaApplicationAdaptor(){ IContext context = MuxAdaptor.this.scope.getContext(); ApplicationContext appCtx = context.getApplicationContext(); - AntMediaApplicationAdapter adaptor = (AntMediaApplicationAdapter) appCtx.getBean(AntMediaApplicationAdapter.BEAN_NAME); - adaptor.endpointFailedUpdate(this.streamId, url); + return (AntMediaApplicationAdapter) appCtx.getBean(AntMediaApplicationAdapter.BEAN_NAME); } /** diff --git a/src/main/java/io/antmedia/rest/BroadcastRestService.java b/src/main/java/io/antmedia/rest/BroadcastRestService.java index f7b007868..7a58eaf84 100644 --- a/src/main/java/io/antmedia/rest/BroadcastRestService.java +++ b/src/main/java/io/antmedia/rest/BroadcastRestService.java @@ -73,10 +73,10 @@ schemes = {SwaggerDefinition.Scheme.HTTP, SwaggerDefinition.Scheme.HTTPS}, externalDocs = @ExternalDocs(value = "External Docs", url = "https://antmedia.io"), host = "test.antmedia.io:5443/Sandbox/rest/" - ) +) @Component @Path("/v2/broadcasts") -public class BroadcastRestService extends RestServiceBase{ +public class BroadcastRestService extends RestServiceBase { private static final String REPLACE_CHARS = "[\n|\r|\t]"; @@ -85,7 +85,7 @@ public class BroadcastRestService extends RestServiceBase{ private static final String ABSOLUTE_MOVE = "absolute"; private static final String CONTINUOUS_MOVE = "continuous"; - @ApiModel(value="SimpleStat", description="Simple generic statistics class to return single values") + @ApiModel(value = "SimpleStat", description = "Simple generic statistics class to return single values") public static class SimpleStat { @ApiModelProperty(value = "the stat value") public long number; @@ -99,9 +99,8 @@ public long getNumber() { } } - @ApiModel(value="WebRTCSendStats", description="Aggregation of WebRTC Low Level Send Stats") - public static class WebRTCSendStats - { + @ApiModel(value = "WebRTCSendStats", description = "Aggregation of WebRTC Low Level Send Stats") + public static class WebRTCSendStats { @ApiModelProperty(value = "Audio send stats") private final WebRTCAudioSendStats audioSendStats; @@ -122,9 +121,8 @@ public WebRTCAudioSendStats getAudioSendStats() { } } - @ApiModel(value="WebRTCReceiveStats", description="Aggregation of WebRTC Low Level Receive Stats") - public static class WebRTCReceiveStats - { + @ApiModel(value = "WebRTCReceiveStats", description = "Aggregation of WebRTC Low Level Receive Stats") + public static class WebRTCReceiveStats { @ApiModelProperty(value = "Audio receive stats") private final WebRTCAudioReceiveStats audioReceiveStats; @@ -149,39 +147,33 @@ public WebRTCAudioReceiveStats getAudioReceiveStats() { @ApiOperation(value = "Creates a Broadcast, IP Camera or Stream Source and returns the full broadcast object with rtmp address and " + "other information. The different between Broadcast and IP Camera or Stream Source is that Broadcast is ingested by Ant Media Server" + "IP Camera or Stream Source is pulled by Ant Media Server") - @ApiResponses(value = { @ApiResponse(code = 400, message = "If stream id is already used in the data store, it returns error", response=Result.class), + @ApiResponses(value = {@ApiResponse(code = 400, message = "If stream id is already used in the data store, it returns error", response = Result.class), @ApiResponse(code = 200, message = "Returns the created stream", response = Broadcast.class)}) @POST - @Consumes({ MediaType.APPLICATION_JSON }) + @Consumes({MediaType.APPLICATION_JSON}) @Path("/create") @ApiModelProperty(readOnly = true) @Produces(MediaType.APPLICATION_JSON) public Response createBroadcast(@ApiParam(value = "Broadcast object. Set the required fields, it may be null as well.", required = false) Broadcast broadcast, - @ApiParam(value = "Only effective if stream is IP Camera or Stream Source. If it's true, it starts automatically pulling stream. Its value is false by default", required = false, defaultValue="false") @QueryParam("autoStart") boolean autoStart) { + @ApiParam(value = "Only effective if stream is IP Camera or Stream Source. If it's true, it starts automatically pulling stream. Its value is false by default", required = false, defaultValue = "false") @QueryParam("autoStart") boolean autoStart) { if (broadcast != null && broadcast.getStreamId() != null) { try { broadcast.setStreamId(broadcast.getStreamId().trim()); - if (!broadcast.getStreamId().isEmpty()) - { + if (!broadcast.getStreamId().isEmpty()) { // make sure stream id is not set on rest service Broadcast broadcastTmp = getDataStore().get(broadcast.getStreamId()); - if (broadcastTmp != null) - { + if (broadcastTmp != null) { return Response.status(Status.BAD_REQUEST).entity(new Result(false, "Stream id is already being used. Please change stream id or keep it empty")).build(); - } - else if (!StreamIdValidator.isStreamIdValid(broadcast.getStreamId())) - { + } else if (!StreamIdValidator.isStreamIdValid(broadcast.getStreamId())) { return Response.status(Status.BAD_REQUEST).entity(new Result(false, "Stream id is not valid.")).build(); } } - } - catch (Exception e) - { + } catch (Exception e) { logger.error(ExceptionUtils.getStackTrace(e)); - return Response.status(Status.BAD_REQUEST).entity(new Result(false, "Stream id set generated exception")).build(); + return Response.status(Status.BAD_REQUEST).entity(new Result(false, "Stream id set generated exception")).build(); } @@ -189,27 +181,25 @@ else if (!StreamIdValidator.isStreamIdValid(broadcast.getStreamId())) Object returnObject = new Result(false, "unexpected parameters received"); - if (autoStart) - { + if (autoStart) { //auto is only effective for IP Camera or Stream Source //so if it's true, it should be IP Camera or Stream Soruce //otherwise wrong parameter if (broadcast != null) { returnObject = addStreamSource(broadcast); } - } - else { + } else { //TODO we need to refactor this method. Refactor validateStreamURL and checkStreamURL - if (broadcast != null && + if (broadcast != null && ((AntMediaApplicationAdapter.IP_CAMERA.equals(broadcast.getType()) && !validateStreamURL(broadcast.getIpAddr())) - || + || (AntMediaApplicationAdapter.STREAM_SOURCE.equals(broadcast.getType()) && !checkStreamUrl(broadcast.getStreamUrl())) - ) - ) { + ) + ) { return Response.status(Status.BAD_REQUEST).entity(new Result(false, "Stream url is not valid. ")).build(); } - if(broadcast != null && broadcast.getSubFolder() != null) { - if(broadcast.getSubFolder().contains("..")) + if (broadcast != null && broadcast.getSubFolder() != null) { + if (broadcast.getSubFolder().contains("..")) return Response.status(Status.BAD_REQUEST).entity(new Result(false, "Subfolder is not valid. ")).build(); } returnObject = createBroadcastWithStreamID(broadcast); @@ -220,31 +210,30 @@ else if (!StreamIdValidator.isStreamIdValid(broadcast.getStreamId())) } @ApiOperation(value = "Delete broadcast from data store and stop if it's broadcasting", response = Result.class) - @ApiResponses(value = { @ApiResponse(code = 200, message = "If it's deleted, success is true. If it's not deleted, success if false.") }) + @ApiResponses(value = {@ApiResponse(code = 200, message = "If it's deleted, success is true. If it's not deleted, success if false.")}) @DELETE - @Consumes({ MediaType.APPLICATION_JSON }) + @Consumes({MediaType.APPLICATION_JSON}) @Path("/{id}") @Produces(MediaType.APPLICATION_JSON) @Override public Result deleteBroadcast(@ApiParam(value = " Id of the broadcast", required = true) @PathParam("id") String id) { - return super.deleteBroadcast(id); + return super.deleteBroadcast(id); } @ApiOperation(value = "Delete multiple broadcasts from data store and stop if they are broadcasting", response = Result.class) - @ApiResponses(value = { @ApiResponse(code = 200, message = "If it's deleted, success is true. If it's not deleted, success if false.") }) + @ApiResponses(value = {@ApiResponse(code = 200, message = "If it's deleted, success is true. If it's not deleted, success if false.")}) @DELETE - @Consumes({ MediaType.APPLICATION_JSON }) + @Consumes({MediaType.APPLICATION_JSON}) @Path("/bulk") @Produces(MediaType.APPLICATION_JSON) @Override - public Result deleteBroadcasts(@ApiParam(value = " Id of the broadcast", required = true) String[] streamIds) - { + public Result deleteBroadcasts(@ApiParam(value = " Id of the broadcast", required = true) String[] streamIds) { return super.deleteBroadcasts(streamIds); } @ApiOperation(value = "Get broadcast object") - @ApiResponses(value = { @ApiResponse(code = 200, message = "Return the broadcast object"), + @ApiResponses(value = {@ApiResponse(code = 200, message = "Return the broadcast object"), @ApiResponse(code = 404, message = "Broadcast object not found")}) @GET @Path("/{id}") @@ -256,49 +245,44 @@ public Response getBroadcast(@ApiParam(value = "id of the broadcast", required = } if (broadcast != null) { return Response.status(Status.OK).entity(broadcast).build(); - } - else { + } else { return Response.status(Status.NOT_FOUND).build(); } } - @ApiOperation(value = "Gets the broadcast list from database. It returns max 50 items at a time", notes = "",responseContainer = "List", response = Broadcast.class) + @ApiOperation(value = "Gets the broadcast list from database. It returns max 50 items at a time", notes = "", responseContainer = "List", response = Broadcast.class) @GET @Path("/list/{offset}/{size}") @Produces(MediaType.APPLICATION_JSON) public List getBroadcastList(@ApiParam(value = "This is the offset of the list, it is useful for pagination. If you want to use sort mechanism, we recommend using Mongo DB.", required = true) @PathParam("offset") int offset, - @ApiParam(value = "Number of items that will be fetched. If there is not enough item in the datastore, returned list size may less then this value", required = true) @PathParam("size") int size, - @ApiParam(value = "Type of the stream. Possible values are \"liveStream\", \"ipCamera\", \"streamSource\", \"VoD\"", required = false) @QueryParam("type_by") String typeBy, - @ApiParam(value = "Field to sort. Possible values are \"name\", \"date\", \"status\"", required = false) @QueryParam("sort_by") String sortBy, - @ApiParam(value = "\"asc\" for Ascending, \"desc\" Descending order", required = false) @QueryParam("order_by") String orderBy, - @ApiParam(value = "Search parameter, returns specific items that contains search string", required = false) @QueryParam("search") String search - ) { + @ApiParam(value = "Number of items that will be fetched. If there is not enough item in the datastore, returned list size may less then this value", required = true) @PathParam("size") int size, + @ApiParam(value = "Type of the stream. Possible values are \"liveStream\", \"ipCamera\", \"streamSource\", \"VoD\"", required = false) @QueryParam("type_by") String typeBy, + @ApiParam(value = "Field to sort. Possible values are \"name\", \"date\", \"status\"", required = false) @QueryParam("sort_by") String sortBy, + @ApiParam(value = "\"asc\" for Ascending, \"desc\" Descending order", required = false) @QueryParam("order_by") String orderBy, + @ApiParam(value = "Search parameter, returns specific items that contains search string", required = false) @QueryParam("search") String search + ) { return getDataStore().getBroadcastList(offset, size, typeBy, sortBy, orderBy, search); } - @ApiOperation(value = "Updates the Broadcast objects fields if it's not null." + - " The updated fields are as follows: name, description, userName, password, IP address, streamUrl of the broadcast. " + + @ApiOperation(value = "Updates the Broadcast objects fields if it's not null." + + " The updated fields are as follows: name, description, userName, password, IP address, streamUrl of the broadcast. " + "It also updates the social endpoints", notes = "", response = Result.class) - @ApiResponses(value = { @ApiResponse(code = 200, message = "If it's updated, success field is true. If it's not updated, success field if false.")}) + @ApiResponses(value = {@ApiResponse(code = 200, message = "If it's updated, success field is true. If it's not updated, success field if false.")}) @PUT @Consumes(MediaType.APPLICATION_JSON) @Path("/{id}") @Produces(MediaType.APPLICATION_JSON) @Override - public Result updateBroadcast(@ApiParam(value="Broadcast id", required = true) @PathParam("id") String id, - @ApiParam(value="Broadcast object with the updates") Broadcast broadcast) { + public Result updateBroadcast(@ApiParam(value = "Broadcast id", required = true) @PathParam("id") String id, + @ApiParam(value = "Broadcast object with the updates") Broadcast broadcast) { Result result = new Result(false); - if (id != null && broadcast != null) - { - if (broadcast.getType() != null && - (broadcast.getType().equals(AntMediaApplicationAdapter.IP_CAMERA) || - broadcast.getType().equals(AntMediaApplicationAdapter.STREAM_SOURCE))) - { + if (id != null && broadcast != null) { + if (broadcast.getType() != null && + (broadcast.getType().equals(AntMediaApplicationAdapter.IP_CAMERA) || + broadcast.getType().equals(AntMediaApplicationAdapter.STREAM_SOURCE))) { result = super.updateStreamSource(id, broadcast); - } - else - { + } else { result = super.updateBroadcast(id, broadcast); } @@ -313,20 +297,17 @@ public Result updateBroadcast(@ApiParam(value="Broadcast id", required = true) @ @Path("/{id}/endpoint") @Produces(MediaType.APPLICATION_JSON) public Result addEndpointV2(@ApiParam(value = "Broadcast id", required = true) @PathParam("id") String id, - @ApiParam(value = "RTMP url of the endpoint that stream will be republished. If required, please encode the URL", required = true) @QueryParam("rtmpUrl") String rtmpUrl) { + @ApiParam(value = "RTMP url of the endpoint that stream will be republished. If required, please encode the URL", required = true) @QueryParam("rtmpUrl") String rtmpUrl) { Result result = super.addEndpoint(id, rtmpUrl); - if (result.isSuccess()) - { + if (result.isSuccess()) { String status = getDataStore().get(id).getStatus(); - if (status.equals(IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING)) - { + if (status.equals(IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING)) { result = getMuxAdaptor(id).startRtmpStreaming(rtmpUrl, 0); } - } - else { + } else { if (logger.isErrorEnabled()) { - logger.error("Rtmp endpoint({}) was not added to the stream: {}", rtmpUrl != null ? rtmpUrl.replaceAll(REPLACE_CHARS, "_") : null , id.replaceAll(REPLACE_CHARS, "_")); + logger.error("Rtmp endpoint({}) was not added to the stream: {}", rtmpUrl != null ? rtmpUrl.replaceAll(REPLACE_CHARS, "_") : null, id.replaceAll(REPLACE_CHARS, "_")); } } @@ -339,50 +320,41 @@ public Result addEndpointV2(@ApiParam(value = "Broadcast id", required = true) @ @Path("/{id}/rtmp-endpoint") @Produces(MediaType.APPLICATION_JSON) public Result addEndpointV3(@ApiParam(value = "Broadcast id", required = true) @PathParam("id") String id, - @ApiParam(value = "RTMP url of the endpoint that stream will be republished. If required, please encode the URL", required = true) Endpoint endpoint, - @ApiParam(value = "Resolution height of the broadcast that is wanted to send to the RTMP endpoint. ", required = false) @QueryParam("resolutionHeight") int resolutionHeight) { + @ApiParam(value = "RTMP url of the endpoint that stream will be republished. If required, please encode the URL", required = true) Endpoint endpoint, + @ApiParam(value = "Resolution height of the broadcast that is wanted to send to the RTMP endpoint. ", required = false) @QueryParam("resolutionHeight") int resolutionHeight) { String rtmpUrl = null; Result result = new Result(false); - if(endpoint != null && endpoint.getRtmpUrl() != null) { + if (endpoint != null && endpoint.getRtmpUrl() != null) { Broadcast broadcast = getDataStore().get(id); if (broadcast != null) { List endpoints = broadcast.getEndPointList(); - if (endpoints == null || endpoints.stream().noneMatch(o -> o.getRtmpUrl().equals(endpoint.getRtmpUrl()))) - { + if (endpoints == null || endpoints.stream().noneMatch(o -> o.getRtmpUrl().equals(endpoint.getRtmpUrl()))) { rtmpUrl = endpoint.getRtmpUrl(); - if (broadcast.getStatus().equals(IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING)) - { + if (broadcast.getStatus().equals(IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING)) { result = processRTMPEndpoint(broadcast.getStreamId(), broadcast.getOriginAdress(), rtmpUrl, true, resolutionHeight); - if (result.isSuccess()) - { + if (result.isSuccess()) { result = super.addEndpoint(id, endpoint); } - } - else - { + } else { result = super.addEndpoint(id, endpoint); } - if (!result.isSuccess()) - { + if (!result.isSuccess()) { result.setMessage("Rtmp endpoint is not added to stream: " + id); } logRtmpEndpointInfo(id, endpoint, result.isSuccess()); - } - else - { + } else { result.setMessage("Rtmp endpoint is not added to datastore for stream " + id + ". It is already added ->" + endpoint.getRtmpUrl()); } } - } - else { + } else { result.setMessage("Missing rtmp url"); } @@ -391,7 +363,7 @@ public Result addEndpointV3(@ApiParam(value = "Broadcast id", required = true) @ private void logRtmpEndpointInfo(String id, Endpoint endpoint, boolean result) { if (logger.isInfoEnabled()) { - logger.info("Rtmp endpoint({}) adding to the stream: {} is {}", endpoint.getRtmpUrl().replaceAll(REPLACE_CHARS, "_") , id.replaceAll(REPLACE_CHARS, "_"), result); + logger.info("Rtmp endpoint({}) adding to the stream: {} is {}", endpoint.getRtmpUrl().replaceAll(REPLACE_CHARS, "_"), id.replaceAll(REPLACE_CHARS, "_"), result); } } @@ -400,21 +372,18 @@ private void logRtmpEndpointInfo(String id, Endpoint endpoint, boolean result) { @Consumes(MediaType.APPLICATION_JSON) @Path("/{id}/endpoint") @Produces(MediaType.APPLICATION_JSON) - public Result removeEndpoint(@ApiParam(value = "Broadcast id", required = true) @PathParam("id") String id, - @ApiParam(value = "RTMP url of the endpoint that will be stopped.", required = true) @QueryParam("rtmpUrl") String rtmpUrl ) { + public Result removeEndpoint(@ApiParam(value = "Broadcast id", required = true) @PathParam("id") String id, + @ApiParam(value = "RTMP url of the endpoint that will be stopped.", required = true) @QueryParam("rtmpUrl") String rtmpUrl) { Result result = super.removeEndpoint(id, rtmpUrl); - if (result.isSuccess()) - { + if (result.isSuccess()) { String status = getDataStore().get(id).getStatus(); - if (status.equals(IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING)) - { + if (status.equals(IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING)) { result = getMuxAdaptor(id).stopRtmpStreaming(rtmpUrl, 0); } - } - else { + } else { if (logger.isErrorEnabled()) { - logger.error("Rtmp endpoint({}) was not removed from the stream: {}", rtmpUrl != null ? rtmpUrl.replaceAll(REPLACE_CHARS, "_") : null , id.replaceAll(REPLACE_CHARS, "_")); + logger.error("Rtmp endpoint({}) was not removed from the stream: {}", rtmpUrl != null ? rtmpUrl.replaceAll(REPLACE_CHARS, "_") : null, id.replaceAll(REPLACE_CHARS, "_")); } } @@ -426,28 +395,26 @@ public Result removeEndpoint(@ApiParam(value = "Broadcast id", required = true) @Consumes(MediaType.APPLICATION_JSON) @Path("/{id}/rtmp-endpoint") @Produces(MediaType.APPLICATION_JSON) - public Result removeEndpointV2(@ApiParam(value = "Broadcast id", required = true) @PathParam("id") String id, - @ApiParam(value = "RTMP url of the endpoint that will be stopped.", required = true) @QueryParam("endpointServiceId") String endpointServiceId, - @ApiParam(value = "Resolution specifier if endpoint has been added with resolution. Only applicable if user added RTMP endpoint with a resolution speficier. Otherwise won't work and won't remove the endpoint.", required = true) - @QueryParam("resolutionHeight") int resolutionHeight){ + public Result removeEndpointV2(@ApiParam(value = "Broadcast id", required = true) @PathParam("id") String id, + @ApiParam(value = "RTMP url of the endpoint that will be stopped.", required = true) @QueryParam("endpointServiceId") String endpointServiceId, + @ApiParam(value = "Resolution specifier if endpoint has been added with resolution. Only applicable if user added RTMP endpoint with a resolution speficier. Otherwise won't work and won't remove the endpoint.", required = true) + @QueryParam("resolutionHeight") int resolutionHeight) { //Get rtmpURL with broadcast String rtmpUrl = null; Broadcast broadcast = getDataStore().get(id); Result result = new Result(false); - if (broadcast != null && endpointServiceId != null && broadcast.getEndPointList() != null && !broadcast.getEndPointList().isEmpty()) - { + if (broadcast != null && endpointServiceId != null && broadcast.getEndPointList() != null && !broadcast.getEndPointList().isEmpty()) { Endpoint endpoint = getRtmpUrlFromList(endpointServiceId, broadcast); if (endpoint != null && endpoint.getRtmpUrl() != null) { rtmpUrl = endpoint.getRtmpUrl(); - result = removeRTMPEndpointProcess(broadcast, endpoint, resolutionHeight, id); + result = removeRTMPEndpointProcess(broadcast, endpoint, resolutionHeight, id); } - } - if (logger.isInfoEnabled()) - { - logger.info("Rtmp endpoint({}) removal operation is {} from the stream: {}", rtmpUrl != null ? rtmpUrl.replaceAll(REPLACE_CHARS, "_") : null , result.isSuccess(), id.replaceAll(REPLACE_CHARS, "_")); + } + if (logger.isInfoEnabled()) { + logger.info("Rtmp endpoint({}) removal operation is {} from the stream: {}", rtmpUrl != null ? rtmpUrl.replaceAll(REPLACE_CHARS, "_") : null, result.isSuccess(), id.replaceAll(REPLACE_CHARS, "_")); } return result; } @@ -455,16 +422,12 @@ public Result removeEndpointV2(@ApiParam(value = "Broadcast id", required = true private Result removeRTMPEndpointProcess(Broadcast broadcast, Endpoint endpoint, int resolutionHeight, String id) { Result result; - if (IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING.equals(broadcast.getStatus())) - { + if (IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING.equals(broadcast.getStatus())) { result = processRTMPEndpoint(broadcast.getStreamId(), broadcast.getOriginAdress(), endpoint.getRtmpUrl(), false, resolutionHeight); - if (result.isSuccess()) - { + if (result.isSuccess()) { result = super.removeRTMPEndpoint(id, endpoint); } - } - else - { + } else { result = super.removeRTMPEndpoint(id, endpoint); } @@ -474,9 +437,8 @@ private Result removeRTMPEndpointProcess(Broadcast broadcast, Endpoint endpoint, private Endpoint getRtmpUrlFromList(String endpointServiceId, Broadcast broadcast) { Endpoint endpoint = null; - for(Endpoint selectedEndpoint: broadcast.getEndPointList()) - { - if(selectedEndpoint.getEndpointServiceId().equals(endpointServiceId)) { + for (Endpoint selectedEndpoint : broadcast.getEndPointList()) { + if (selectedEndpoint.getEndpointServiceId().equals(endpointServiceId)) { endpoint = selectedEndpoint; } } @@ -484,13 +446,13 @@ private Endpoint getRtmpUrlFromList(String endpointServiceId, Broadcast broadcas } - @ApiOperation(value = "Get detected objects from the stream based on offset and size", notes = "",responseContainer = "List", response = TensorFlowObject.class) + @ApiOperation(value = "Get detected objects from the stream based on offset and size", notes = "", responseContainer = "List", response = TensorFlowObject.class) @GET @Path("/{id}/detections/{offset}/{size}") @Produces(MediaType.APPLICATION_JSON) public List getDetectionListV2(@ApiParam(value = "the id of the stream", required = true) @PathParam("id") String id, - @ApiParam(value = "starting point of the list", required = true) @PathParam("offset") int offset, - @ApiParam(value = "total size of the return list", required = true) @PathParam("size") int size) { + @ApiParam(value = "starting point of the list", required = true) @PathParam("offset") int offset, + @ApiParam(value = "total size of the return list", required = true) @PathParam("size") int size) { return super.getDetectionList(id, offset, size); } @@ -498,7 +460,7 @@ public List getDetectionListV2(@ApiParam(value = "the id of th @GET @Path("/{id}/detections/count") @Produces(MediaType.APPLICATION_JSON) - public SimpleStat getObjectDetectedTotal(@ApiParam(value = "id of the stream", required = true) @PathParam("id") String id){ + public SimpleStat getObjectDetectedTotal(@ApiParam(value = "id of the stream", required = true) @PathParam("id") String id) { return new SimpleStat(getDataStore().getObjectDetectedTotal(id)); } @@ -506,8 +468,7 @@ public SimpleStat getObjectDetectedTotal(@ApiParam(value = "id of the stream", r @POST @Path("/import-to-stalker") @Produces(MediaType.APPLICATION_JSON) - public Result importLiveStreams2StalkerV2() - { + public Result importLiveStreams2StalkerV2() { return super.importLiveStreams2Stalker(); } @@ -525,8 +486,7 @@ public SimpleStat getTotalBroadcastNumberV2() { @Path("/count/{search}") @Produces(MediaType.APPLICATION_JSON) public SimpleStat getTotalBroadcastNumberV2( - @ApiParam(value = "Search parameter to get the number of items including it ", required = true) @PathParam("search") String search) - { + @ApiParam(value = "Search parameter to get the number of items including it ", required = true) @PathParam("search") String search) { return new SimpleStat(getDataStore().getPartialBroadcastNumber(search)); } @@ -539,45 +499,39 @@ public SimpleStat getAppLiveStatistics() { } - - @ApiOperation(value = "Generates random one-time token for specified stream") - @ApiResponses(value = { @ApiResponse(code = 200, message = "Returns token", response=Token.class), - @ApiResponse(code = 400, message = "When there is an error in creating token", response=Result.class)}) + @ApiResponses(value = {@ApiResponse(code = 200, message = "Returns token", response = Token.class), + @ApiResponse(code = 400, message = "When there is an error in creating token", response = Result.class)}) @GET @Path("/{id}/token") @Produces(MediaType.APPLICATION_JSON) - public Response getTokenV2 (@ApiParam(value = "The id of the stream", required = true) @PathParam("id")String streamId, - @ApiParam(value = "The expire time of the token. It's in unix timestamp seconds", required = true) @QueryParam("expireDate") long expireDate, - @ApiParam(value = "Type of the token. It may be play or publish ", required = true) @QueryParam("type") String type, - @ApiParam(value = "Room Id that token belongs to. It's not mandatory ", required = false) @QueryParam("roomId") String roomId) - { + public Response getTokenV2(@ApiParam(value = "The id of the stream", required = true) @PathParam("id") String streamId, + @ApiParam(value = "The expire time of the token. It's in unix timestamp seconds", required = true) @QueryParam("expireDate") long expireDate, + @ApiParam(value = "Type of the token. It may be play or publish ", required = true) @QueryParam("type") String type, + @ApiParam(value = "Room Id that token belongs to. It's not mandatory ", required = false) @QueryParam("roomId") String roomId) { Object result = super.getToken(streamId, expireDate, type, roomId); if (result instanceof Token) { return Response.status(Status.OK).entity(result).build(); - } - else { + } else { return Response.status(Status.BAD_REQUEST).entity(result).build(); } } - + @ApiOperation(value = "Generates JWT token for specified stream. It's not required to let the server generate JWT. Generally JWT tokens should be generated on the client side.") - @ApiResponses(value = { @ApiResponse(code = 200, message = "Returns token", response=Token.class), - @ApiResponse(code = 400, message = "When there is an error in creating token", response=Result.class)}) + @ApiResponses(value = {@ApiResponse(code = 200, message = "Returns token", response = Token.class), + @ApiResponse(code = 400, message = "When there is an error in creating token", response = Result.class)}) @GET @Path("/{id}/jwt-token") @Produces(MediaType.APPLICATION_JSON) - public Response getJwtTokenV2 (@ApiParam(value = "The id of the stream", required = true) @PathParam("id")String streamId, - @ApiParam(value = "The expire time of the token. It's in unix timestamp seconds.", required = true) @QueryParam("expireDate") long expireDate, - @ApiParam(value = "Type of the JWT token. It may be play or publish ", required = true) @QueryParam("type") String type, - @ApiParam(value = "Room Id that token belongs to. It's not mandatory ", required = false) @QueryParam("roomId") String roomId) - { + public Response getJwtTokenV2(@ApiParam(value = "The id of the stream", required = true) @PathParam("id") String streamId, + @ApiParam(value = "The expire time of the token. It's in unix timestamp seconds.", required = true) @QueryParam("expireDate") long expireDate, + @ApiParam(value = "Type of the JWT token. It may be play or publish ", required = true) @QueryParam("type") String type, + @ApiParam(value = "Room Id that token belongs to. It's not mandatory ", required = false) @QueryParam("roomId") String roomId) { Object result = super.getJwtToken(streamId, expireDate, type, roomId); if (result instanceof Token) { return Response.status(Status.OK).entity(result).build(); - } - else { + } else { return Response.status(Status.BAD_REQUEST).entity(result).build(); } } @@ -588,9 +542,8 @@ public Response getJwtTokenV2 (@ApiParam(value = "The id of the stream", require @Consumes(MediaType.APPLICATION_JSON) @Path("/validate-token") @Produces(MediaType.APPLICATION_JSON) - public Result validateTokenV2(@ApiParam(value = "Token to be validated", required = true) Token token) - { - boolean result = false; + public Result validateTokenV2(@ApiParam(value = "Token to be validated", required = true) Token token) { + boolean result = false; Token validateToken = super.validateToken(token); if (validateToken != null) { result = true; @@ -610,43 +563,43 @@ public Result revokeTokensV2(@ApiParam(value = "the id of the stream", required } - @ApiOperation(value = "Get the all tokens of requested stream", notes = "",responseContainer = "List", response = Token.class) + @ApiOperation(value = "Get the all tokens of requested stream", notes = "", responseContainer = "List", response = Token.class) @GET @Path("/{id}/tokens/list/{offset}/{size}") @Produces(MediaType.APPLICATION_JSON) public List listTokensV2(@ApiParam(value = "the id of the stream", required = true) @PathParam("id") String streamId, - @ApiParam(value = "the starting point of the list", required = true) @PathParam("offset") int offset, - @ApiParam(value = "size of the return list (max:50 )", required = true) @PathParam("size") int size) { + @ApiParam(value = "the starting point of the list", required = true) @PathParam("offset") int offset, + @ApiParam(value = "size of the return list (max:50 )", required = true) @PathParam("size") int size) { List tokens = null; - if(streamId != null) { + if (streamId != null) { tokens = getDataStore().listAllTokens(streamId, offset, size); } return tokens; } - @ApiOperation(value = "Get the all subscribers of the requested stream. It does not return subscriber-stats. Please use subscriber-stats method", notes = "",responseContainer = "List", response = Subscriber.class) + @ApiOperation(value = "Get the all subscribers of the requested stream. It does not return subscriber-stats. Please use subscriber-stats method", notes = "", responseContainer = "List", response = Subscriber.class) @GET @Path("/{id}/subscribers/list/{offset}/{size}") @Produces(MediaType.APPLICATION_JSON) public List listSubscriberV2(@ApiParam(value = "the id of the stream", required = true) @PathParam("id") String streamId, - @ApiParam(value = "the starting point of the list", required = true) @PathParam("offset") int offset, - @ApiParam(value = "size of the return list (max:50 )", required = true) @PathParam("size") int size) { + @ApiParam(value = "the starting point of the list", required = true) @PathParam("offset") int offset, + @ApiParam(value = "size of the return list (max:50 )", required = true) @PathParam("size") int size) { List subscribers = null; - if(streamId != null) { + if (streamId != null) { subscribers = getDataStore().listAllSubscribers(streamId, offset, size); } return subscribers; - } + } - @ApiOperation(value = "Get the all subscriber statistics of the requested stream", notes = "",responseContainer = "List", response = SubscriberStats.class) + @ApiOperation(value = "Get the all subscriber statistics of the requested stream", notes = "", responseContainer = "List", response = SubscriberStats.class) @GET @Path("/{id}/subscriber-stats/list/{offset}/{size}") @Produces(MediaType.APPLICATION_JSON) public List listSubscriberStatsV2(@ApiParam(value = "the id of the stream", required = true) @PathParam("id") String streamId, - @ApiParam(value = "the starting point of the list", required = true) @PathParam("offset") int offset, - @ApiParam(value = "size of the return list (max:50 )", required = true) @PathParam("size") int size) { + @ApiParam(value = "the starting point of the list", required = true) @PathParam("offset") int offset, + @ApiParam(value = "size of the return list (max:50 )", required = true) @PathParam("size") int size) { List subscriberStats = null; - if(streamId != null) { + if (streamId != null) { subscriberStats = getDataStore().listAllSubscriberStats(streamId, offset, size); } return subscriberStats; @@ -663,9 +616,8 @@ public Result addSubscriber( @ApiParam(value = "Subscriber to be added to this stream", required = true) Subscriber subscriber) { boolean result = false; String message = ""; - if (subscriber != null && !StringUtils.isBlank(subscriber.getSubscriberId()) - && subscriber.getSubscriberId().length() > 3) - { + if (subscriber != null && !StringUtils.isBlank(subscriber.getSubscriberId()) + && subscriber.getSubscriberId().length() > 3) { // add stream id inside the Subscriber subscriber.setStreamId(streamId); // create a new stats object before adding to datastore @@ -674,126 +626,112 @@ public Result addSubscriber( subscriber.setConnected(false); // subscriber is not viewing anyone subscriber.setCurrentConcurrentConnections(0); - + if (streamId != null) { result = getDataStore().addSubscriber(streamId, subscriber); - } - else { + } else { message = "StreamId is not specified in the request"; } - } - else { + } else { message = "Subscriber object must be set and subscriberId's length must be at least 3"; } return new Result(result, message); } - - @ApiOperation(value="Return TOTP for the subscriberId, streamId, type. This is a helper method. You can generate TOTP on your end." + + @ApiOperation(value = "Return TOTP for the subscriberId, streamId, type. This is a helper method. You can generate TOTP on your end." + "If subscriberId is not in the database, it generates TOTP from the secret in the AppSettings. Secret code is for the subscriberId not in the database" - + + " secretCode = Base32.encodeAsString({secretFromSettings(publishsecret or playsecret according to the type)} + {subscriberId} + {streamId} + {type(publish or play)} + {Number of X to have the length multiple of 8}" + "'+' means concatenating the strings. There is no explicit '+' in the secretCode ") @GET - @Consumes({ MediaType.APPLICATION_JSON }) + @Consumes({MediaType.APPLICATION_JSON}) @Path("/{id}/subscribers/{sid}/totp") @Produces(MediaType.APPLICATION_JSON) - public Result getTOTP(@ApiParam(value="The id of the stream that TOTP will be generated", required=true) @PathParam("id") String streamId, - @ApiParam(value="The id of the subscriber that TOTP will be generated ", required=true) @PathParam("sid") String subscriberId, - @ApiParam(value="The type of token. It's being used if subscriber is not in the database. It can be publish, play", - required=false) @QueryParam("type") String type) - { - + public Result getTOTP(@ApiParam(value = "The id of the stream that TOTP will be generated", required = true) @PathParam("id") String streamId, + @ApiParam(value = "The id of the subscriber that TOTP will be generated ", required = true) @PathParam("sid") String subscriberId, + @ApiParam(value = "The type of token. It's being used if subscriber is not in the database. It can be publish, play", + required = false) @QueryParam("type") String type) { + boolean result = false; String message = ""; String totp = ""; - if (!StringUtils.isAnyBlank(streamId, subscriberId)) - { + if (!StringUtils.isAnyBlank(streamId, subscriberId)) { Subscriber subscriber = getDataStore().getSubscriber(streamId, subscriberId); - if (subscriber != null && StringUtils.isNotBlank(subscriber.getB32Secret())) - { - - totp = TOTPGenerator.generateTOTP(Base32.decode(subscriber.getB32Secret().getBytes()), getAppSettings().getTimeTokenPeriod(), 6, ITokenService.HMAC_SHA1); - } - else - { + if (subscriber != null && StringUtils.isNotBlank(subscriber.getB32Secret())) { + + totp = TOTPGenerator.generateTOTP(Base32.decode(subscriber.getB32Secret().getBytes()), getAppSettings().getTimeTokenPeriod(), 6, ITokenService.HMAC_SHA1); + } else { String secretFromSettings = getAppSettings().getTimeTokenSecretForPublish(); - if (Subscriber.PLAY_TYPE.equals(type)) - { + if (Subscriber.PLAY_TYPE.equals(type)) { secretFromSettings = getAppSettings().getTimeTokenSecretForPlay(); } - + if (StringUtils.isNotBlank(secretFromSettings)) { //Secret code is generated by using this secretFromSettings + subscriberId + streamId + type + "add number of X to have the length multiple of 8" totp = TOTPGenerator.generateTOTP(Base32.decode(TOTPGenerator.getSecretCodeForNotRecordedSubscriberId(subscriberId, streamId, type, secretFromSettings).getBytes()), - getAppSettings().getTimeTokenPeriod(), 6, ITokenService.HMAC_SHA1); - } - else { + getAppSettings().getTimeTokenPeriod(), 6, ITokenService.HMAC_SHA1); + } else { message = "Secret is not set in AppSettings. Please set timtokensecret publish or play in Applicaiton settings"; } - + } if (!StringUtils.isBlank(totp)) { result = true; } - - } - else { + + } else { message = "streamId or subscriberId is blank"; } - + return new Result(result, totp, message); - + } @ApiOperation(value = "Delete specific subscriber from data store for selected stream", response = Result.class) @DELETE - @Consumes({ MediaType.APPLICATION_JSON }) + @Consumes({MediaType.APPLICATION_JSON}) @Path("/{id}/subscribers/{sid}") @Produces(MediaType.APPLICATION_JSON) public Result deleteSubscriber(@ApiParam(value = "the id of the stream", required = true) @PathParam("id") String streamId, - @ApiParam(value = "the id of the subscriber", required = true) @PathParam("sid") String subscriberId) { - boolean result = false; + @ApiParam(value = "the id of the subscriber", required = true) @PathParam("sid") String subscriberId) { + boolean result = false; - if(streamId != null) { + if (streamId != null) { result = getDataStore().deleteSubscriber(streamId, subscriberId); } - return new Result(result); + return new Result(result); } @ApiOperation(value = "Block specific subscriber. It's secure to use this with TOTP streaming. It blocks the subscriber for seconds from the moment this method is called", response = Result.class) @PUT - @Consumes({ MediaType.APPLICATION_JSON }) + @Consumes({MediaType.APPLICATION_JSON}) @Path("/{id}/subscribers/{sid}/block/{seconds}/{type}") @Produces(MediaType.APPLICATION_JSON) public Result blockSubscriber(@ApiParam(value = "the id of the stream", required = true) @PathParam("id") String streamId, - @ApiParam(value = "the id of the subscriber", required = true) @PathParam("sid") String subscriberId, - @ApiParam(value = "seconds to block the user", required = true) @PathParam("seconds") int seconds, - @ApiParam(value = "block type it can be 'publish', 'play' or 'publish_play'", required = true) @PathParam("type") String blockType) { + @ApiParam(value = "the id of the subscriber", required = true) @PathParam("sid") String subscriberId, + @ApiParam(value = "seconds to block the user", required = true) @PathParam("seconds") int seconds, + @ApiParam(value = "block type it can be 'publish', 'play' or 'publish_play'", required = true) @PathParam("type") String blockType) { boolean result = false; String message = ""; - - if (!StringUtils.isAnyBlank(streamId, subscriberId)) - { + if (!StringUtils.isAnyBlank(streamId, subscriberId)) { //if the user is not in this node, it's in another node in the cluster. //The proxy filter will forward the request to the related node before {@link RestProxyFilter} - + result = getDataStore().blockSubscriber(streamId, subscriberId, blockType, seconds); - - if (Subscriber.PLAY_TYPE.equals(blockType) || Subscriber.PUBLISH_AND_PLAY_TYPE.equals(blockType) ) - { + + if (Subscriber.PLAY_TYPE.equals(blockType) || Subscriber.PUBLISH_AND_PLAY_TYPE.equals(blockType)) { getApplication().stopPlayingBySubscriberId(subscriberId); - } - + } + if (Subscriber.PUBLISH_TYPE.equals(blockType) || Subscriber.PUBLISH_AND_PLAY_TYPE.equals(blockType)) { getApplication().stopPublishingBySubscriberId(subscriberId); } - - - } - else { + + + } else { message = "streamId or subscriberId is blank"; } @@ -806,14 +744,14 @@ public Result blockSubscriber(@ApiParam(value = "the id of the stream", required @Path("/{id}/subscribers") @Produces(MediaType.APPLICATION_JSON) public Result revokeSubscribers(@ApiParam(value = "the id of the stream", required = true) @PathParam("id") String streamId) { - boolean result = false; + boolean result = false; - if(streamId != null) { + if (streamId != null) { result = getDataStore().revokeSubscribers(streamId); } return new Result(result); - } + } @ApiOperation(value = "Get the broadcast live statistics total RTMP watcher count, total HLS watcher count, total WebRTC watcher count", notes = "", response = BroadcastStatistics.class) @GET @@ -833,57 +771,54 @@ public AppBroadcastStatistics getBroadcastTotalStatistics() { return super.getBroadcastTotalStatistics(); } - @ApiOperation(value = "Get WebRTC Low Level Send stats in general", notes = "",response = WebRTCSendStats.class) + @ApiOperation(value = "Get WebRTC Low Level Send stats in general", notes = "", response = WebRTCSendStats.class) @GET @Path("/webrtc-send-low-level-stats") @Produces(MediaType.APPLICATION_JSON) - public WebRTCSendStats getWebRTCLowLevelSendStats() - { + public WebRTCSendStats getWebRTCLowLevelSendStats() { return new WebRTCSendStats(getApplication().getWebRTCAudioSendStats(), getApplication().getWebRTCVideoSendStats()); } - @ApiOperation(value = "Get WebRTC Low Level receive stats in general", notes = "",response = WebRTCSendStats.class) + @ApiOperation(value = "Get WebRTC Low Level receive stats in general", notes = "", response = WebRTCSendStats.class) @GET @Path("/webrtc-receive-low-level-stats") @Produces(MediaType.APPLICATION_JSON) - public WebRTCReceiveStats getWebRTCLowLevelReceiveStats() - { + public WebRTCReceiveStats getWebRTCLowLevelReceiveStats() { return new WebRTCReceiveStats(getApplication().getWebRTCAudioReceiveStats(), getApplication().getWebRTCVideoReceiveStats()); } - @ApiOperation(value = "Get RTMP to WebRTC path stats in general", notes = "",response = RTMPToWebRTCStats.class) + @ApiOperation(value = "Get RTMP to WebRTC path stats in general", notes = "", response = RTMPToWebRTCStats.class) @GET @Path("/{id}/rtmp-to-webrtc-stats") @Produces(MediaType.APPLICATION_JSON) - public RTMPToWebRTCStats getRTMPToWebRTCStats(@ApiParam(value = "the id of the stream", required = true) @PathParam("id") String id) - { + public RTMPToWebRTCStats getRTMPToWebRTCStats(@ApiParam(value = "the id of the stream", required = true) @PathParam("id") String id) { return getApplication().getRTMPToWebRTCStats(id); } - @ApiOperation(value = "Get WebRTC Client Statistics such as : Audio bitrate, Video bitrate, Target bitrate, Video Sent Period etc.", notes = "", responseContainer = "List",response = WebRTCClientStats.class) + @ApiOperation(value = "Get WebRTC Client Statistics such as : Audio bitrate, Video bitrate, Target bitrate, Video Sent Period etc.", notes = "", responseContainer = "List", response = WebRTCClientStats.class) @GET @Path("/{stream_id}/webrtc-client-stats/{offset}/{size}") @Produces(MediaType.APPLICATION_JSON) public List getWebRTCClientStatsListV2(@ApiParam(value = "offset of the list", required = true) @PathParam("offset") int offset, - @ApiParam(value = "Number of items that will be fetched", required = true) @PathParam("size") int size, - @ApiParam(value = "the id of the stream", required = true) @PathParam("stream_id") String streamId) { + @ApiParam(value = "Number of items that will be fetched", required = true) @PathParam("size") int size, + @ApiParam(value = "the id of the stream", required = true) @PathParam("stream_id") String streamId) { return super.getWebRTCClientStatsList(offset, size, streamId); } @Deprecated - @ApiOperation(value = "Returns filtered broadcast list according to type. It's useful for getting IP Camera and Stream Sources from the whole list. If you want to use sort mechanism, we recommend using Mongo DB.", notes = "",responseContainer = "List",response = Broadcast.class) + @ApiOperation(value = "Returns filtered broadcast list according to type. It's useful for getting IP Camera and Stream Sources from the whole list. If you want to use sort mechanism, we recommend using Mongo DB.", notes = "", responseContainer = "List", response = Broadcast.class) @GET @Consumes(MediaType.APPLICATION_JSON) @Path("/filter-list/{offset}/{size}/{type}") @Produces(MediaType.APPLICATION_JSON) public List filterBroadcastListV2(@ApiParam(value = "starting point of the list", required = true) @PathParam("offset") int offset, - @ApiParam(value = "size of the return list (max:50 )", required = true) @PathParam("size") int size, - @ApiParam(value = "type of the stream. Possible values are \"liveStream\", \"ipCamera\", \"streamSource\", \"VoD\"", required = true) @PathParam("type") String type, - @ApiParam(value = "field to sort", required = false) @QueryParam("sort_by") String sortBy, - @ApiParam(value = "asc for Ascending, desc Descending order", required = false) @QueryParam("order_by") String orderBy - ) { + @ApiParam(value = "size of the return list (max:50 )", required = true) @PathParam("size") int size, + @ApiParam(value = "type of the stream. Possible values are \"liveStream\", \"ipCamera\", \"streamSource\", \"VoD\"", required = true) @PathParam("type") String type, + @ApiParam(value = "field to sort", required = false) @QueryParam("sort_by") String sortBy, + @ApiParam(value = "asc for Ascending, desc Descending order", required = false) @QueryParam("order_by") String orderBy + ) { return getDataStore().getBroadcastList(offset, size, type, sortBy, orderBy, null); } @@ -893,14 +828,14 @@ public List filterBroadcastListV2(@ApiParam(value = "starting point o @Path("/{id}/recording/{recording-status}") @Produces(MediaType.APPLICATION_JSON) public Result enableRecording(@ApiParam(value = "the id of the stream", required = true) @PathParam("id") String streamId, - @ApiParam(value = "Change recording status. If true, starts recording. If false stop recording", required = true) @PathParam("recording-status") boolean enableRecording, - @ApiParam(value = "Record type: 'mp4' or 'webm'. It's optional parameter.", required = false) @QueryParam("recordType") String recordType, - @ApiParam(value = "Resolution height of the broadcast that is wanted to record. ", required = false) @QueryParam("resolutionHeight") int resolutionHeight - ) { + @ApiParam(value = "Change recording status. If true, starts recording. If false stop recording", required = true) @PathParam("recording-status") boolean enableRecording, + @ApiParam(value = "Record type: 'mp4' or 'webm'. It's optional parameter.", required = false) @QueryParam("recordType") String recordType, + @ApiParam(value = "Resolution height of the broadcast that is wanted to record. ", required = false) @QueryParam("resolutionHeight") int resolutionHeight + ) { if (logger.isInfoEnabled()) { logger.info("Recording method is called for {} to make it {} and record Type: {} resolution:{}", streamId.replaceAll(REPLACE_CHARS, "_"), enableRecording, recordType != null ? recordType.replaceAll(REPLACE_CHARS, "_") : null, resolutionHeight); } - recordType = (recordType==null) ? RecordType.MP4.toString() : recordType; // It means, if recordType is null, function using Mp4 Record by default + recordType = (recordType == null) ? RecordType.MP4.toString() : recordType; // It means, if recordType is null, function using Mp4 Record by default return enableRecordMuxing(streamId, enableRecording, recordType, resolutionHeight); } @@ -919,8 +854,7 @@ public Result getCameraErrorV2(@ApiParam(value = "StreamId of the IP Camera Stre @Consumes(MediaType.APPLICATION_JSON) @Path("/{id}/start") @Produces(MediaType.APPLICATION_JSON) - public Result startStreamSourceV2(@ApiParam(value = "the id of the stream. The broadcast type should be IP Camera or Stream Source otherwise it does not work", required = true) @PathParam("id") String id) - { + public Result startStreamSourceV2(@ApiParam(value = "the id of the stream. The broadcast type should be IP Camera or Stream Source otherwise it does not work", required = true) @PathParam("id") String id) { return super.startStreamSource(id); } @@ -929,8 +863,7 @@ public Result startStreamSourceV2(@ApiParam(value = "the id of the stream. The b @Consumes(MediaType.APPLICATION_JSON) @Path("/{id}/stop") @Produces(MediaType.APPLICATION_JSON) - public Result stopStreamingV2(@ApiParam(value = "the id of the broadcast.", required = true) @PathParam("id") String id) - { + public Result stopStreamingV2(@ApiParam(value = "the id of the broadcast.", required = true) @PathParam("id") String id) { return super.stopStreaming(id); } @@ -965,11 +898,11 @@ public String[] getOnvifDeviceProfiles(@ApiParam(value = "The id of the IP Camer @Path("/{id}/ip-camera/move") @Produces(MediaType.APPLICATION_JSON) public Result moveIPCamera(@ApiParam(value = "The id of the IP Camera", required = true) @PathParam("id") String id, - @ApiParam(value = "Movement in X direction. If not specified, it's assumed to be zero. Valid ranges between -1.0f and 1.0f for all movements ", required = false) @QueryParam("valueX") Float valueX, - @ApiParam(value = "Movement in Y direction. If not specified, it's assumed to be zero. Valid ranges between -1.0f and 1.0f for all movements ", required = false) @QueryParam("valueY") Float valueY, - @ApiParam(value = "Movement in Zoom. If not specified, it's assumed to be zero. Valid ranges for relative and continous move is between -1.0f and 1.0f. For absolute move between 0.0f and 1.0f ", required = false) @QueryParam("valueZ") Float valueZ, - @ApiParam(value = "Movement type. It can be absolute, relative or continuous. If not specified, it's relative", required = false) @QueryParam("movement") String movement - ) { + @ApiParam(value = "Movement in X direction. If not specified, it's assumed to be zero. Valid ranges between -1.0f and 1.0f for all movements ", required = false) @QueryParam("valueX") Float valueX, + @ApiParam(value = "Movement in Y direction. If not specified, it's assumed to be zero. Valid ranges between -1.0f and 1.0f for all movements ", required = false) @QueryParam("valueY") Float valueY, + @ApiParam(value = "Movement in Zoom. If not specified, it's assumed to be zero. Valid ranges for relative and continous move is between -1.0f and 1.0f. For absolute move between 0.0f and 1.0f ", required = false) @QueryParam("valueZ") Float valueZ, + @ApiParam(value = "Movement type. It can be absolute, relative or continuous. If not specified, it's relative", required = false) @QueryParam("movement") String movement + ) { boolean result = false; String message = STREAM_ID_NOT_VALID; if (id != null && StreamIdValidator.isStreamIdValid(id)) { @@ -992,35 +925,30 @@ public Result moveIPCamera(@ApiParam(value = "The id of the IP Camera", required if (movement.equals(RELATIVE_MOVE)) { result = super.moveRelative(id, valueX, valueY, valueZ); - } - else if (movement.equals(CONTINUOUS_MOVE)) { + } else if (movement.equals(CONTINUOUS_MOVE)) { result = super.moveContinous(id, valueX, valueY, valueZ); - } - else if (movement.equals(ABSOLUTE_MOVE)) { + } else if (movement.equals(ABSOLUTE_MOVE)) { result = super.moveAbsolute(id, valueX, valueY, valueZ); - } - else { + } else { message = "Movement type is not supported. Supported types are continous, relative and absolute but was " + movement; - } + } } return new Result(result, message); } - @ApiOperation(value="Stop move for IP Camera.", response = Result.class) + @ApiOperation(value = "Stop move for IP Camera.", response = Result.class) @POST @Path("/{id}/ip-camera/stop-move") @Produces(MediaType.APPLICATION_JSON) public Result stopMove(@ApiParam(value = "the id of the IP Camera", required = true) @PathParam("id") String id) { boolean result = false; String message = STREAM_ID_NOT_VALID; - if (id != null && StreamIdValidator.isStreamIdValid(id)) - { + if (id != null && StreamIdValidator.isStreamIdValid(id)) { OnvifCamera camera = getApplication().getOnvifCamera(id); if (camera != null) { result = camera.moveStop(); message = ""; - } - else { + } else { message = "Camera not found"; } } @@ -1029,10 +957,10 @@ public Result stopMove(@ApiParam(value = "the id of the IP Camera", required = t @ApiOperation(value = "Creates a conference room with the parameters. The room name is key so if this is called with the same room name then new room is overwritten to old one", response = ConferenceRoom.class) - @ApiResponses(value = { @ApiResponse(code = 400, message = "If operation is no completed for any reason", response=Result.class), + @ApiResponses(value = {@ApiResponse(code = 400, message = "If operation is no completed for any reason", response = Result.class), @ApiResponse(code = 200, message = "Returns the created conference room", response = ConferenceRoom.class)}) @POST - @Consumes({ MediaType.APPLICATION_JSON }) + @Consumes({MediaType.APPLICATION_JSON}) @Path("/conference-rooms") @Produces(MediaType.APPLICATION_JSON) public Response createConferenceRoomV2(@ApiParam(value = "Conference Room object with start and end date", required = true) ConferenceRoom room) { @@ -1046,15 +974,15 @@ public Response createConferenceRoomV2(@ApiParam(value = "Conference Room object } @ApiOperation(value = "Edits previously saved conference room", response = Response.class) - @ApiResponses(value = { @ApiResponse(code = 400, message = "If operation is no completed for any reason", response=Result.class), + @ApiResponses(value = {@ApiResponse(code = 400, message = "If operation is no completed for any reason", response = Result.class), @ApiResponse(code = 200, message = "Returns the updated Conference room", response = ConferenceRoom.class)}) @PUT - @Consumes({ MediaType.APPLICATION_JSON }) + @Consumes({MediaType.APPLICATION_JSON}) @Path("/conference-rooms/{room_id}") @Produces(MediaType.APPLICATION_JSON) - public Response editConferenceRoom(@ApiParam(value="Room id") @PathParam("room_id") String roomId, @ApiParam(value = "Conference Room object with start and end date", required = true) ConferenceRoom room) { + public Response editConferenceRoom(@ApiParam(value = "Room id") @PathParam("room_id") String roomId, @ApiParam(value = "Conference Room object with start and end date", required = true) ConferenceRoom room) { - if(room != null && getDataStore().editConferenceRoom(roomId, room)) { + if (room != null && getDataStore().editConferenceRoom(roomId, room)) { return Response.status(Status.OK).entity(room).build(); } return Response.status(Status.BAD_REQUEST).entity(new Result(false, "Operation not completed")).build(); @@ -1062,7 +990,7 @@ public Response editConferenceRoom(@ApiParam(value="Room id") @PathParam("room_i @ApiOperation(value = "Deletes a conference room. The room id is key so if this is called with the same room id then new room is overwritten to old one", response = Result.class) @DELETE - @Consumes({ MediaType.APPLICATION_JSON }) + @Consumes({MediaType.APPLICATION_JSON}) @Path("/conference-rooms/{room_id}") @Produces(MediaType.APPLICATION_JSON) public Result deleteConferenceRoomV2(@ApiParam(value = "the id of the conference room", required = true) @PathParam("room_id") String roomId) { @@ -1072,7 +1000,7 @@ public Result deleteConferenceRoomV2(@ApiParam(value = "the id of the conference public void logWarning(String message, String... arguments) { if (logger.isWarnEnabled()) { - logger.warn(message , arguments); + logger.warn(message, arguments); } } @@ -1082,14 +1010,12 @@ public void logWarning(String message, String... arguments) { @Path("/{id}/subtrack") @Produces(MediaType.APPLICATION_JSON) public Result addSubTrack(@ApiParam(value = "Broadcast id(main track)", required = true) @PathParam("id") String id, - @ApiParam(value = "Subtrack Stream Id", required = true) @QueryParam("id") String subTrackId) - { + @ApiParam(value = "Subtrack Stream Id", required = true) @QueryParam("id") String subTrackId) { Result result = new Result(false); Broadcast subTrack = getDataStore().get(subTrackId); String message = ""; - if (subTrack != null) - { + if (subTrack != null) { subTrack.setMainTrackStreamId(id); //Update subtrack's main Track Id @@ -1107,28 +1033,22 @@ public Result addSubTrack(@ApiParam(value = "Broadcast id(main track)", required addStreamToConferenceRoom(id, subTrackId, getDataStore()); } - } - else - { + } else { message = "Main track of the stream " + subTrackId + " cannot be updated"; logWarning("Main track of the stream:{} cannot be updated to {}", subTrackId.replaceAll(REPLACE_CHARS, "_"), id.replaceAll(REPLACE_CHARS, "_")); } - } - else - { + } else { message = "There is not stream with id:" + subTrackId; - logWarning("There is not stream with id:{}" , subTrackId.replaceAll(REPLACE_CHARS, "_")); + logWarning("There is not stream with id:{}", subTrackId.replaceAll(REPLACE_CHARS, "_")); } result.setMessage(message); return result; } - public void setResultSuccess(Result result, boolean success, String failMessage, String failLog, String... arguments) - { + public void setResultSuccess(Result result, boolean success, String failMessage, String failLog, String... arguments) { if (success) { result.setSuccess(true); - } - else { + } else { result.setSuccess(false); result.setMessage(failMessage); logWarning(failLog, arguments); @@ -1141,14 +1061,12 @@ public void setResultSuccess(Result result, boolean success, String failMessage, @Path("/{id}/subtrack") @Produces(MediaType.APPLICATION_JSON) public Result removeSubTrack(@ApiParam(value = "Broadcast id(main track)", required = true) @PathParam("id") String id, - @ApiParam(value = "Subtrack Stream Id", required = true) @QueryParam("id") String subTrackId) - { + @ApiParam(value = "Subtrack Stream Id", required = true) @QueryParam("id") String subTrackId) { Result result = new Result(false); Broadcast subTrack = getDataStore().get(subTrackId); - if (subTrack != null) - { - if(id != null && id.equals(subTrack.getMainTrackStreamId())) { + if (subTrack != null) { + if (id != null && id.equals(subTrack.getMainTrackStreamId())) { subTrack.setMainTrackStreamId(""); } @@ -1159,69 +1077,62 @@ public Result removeSubTrack(@ApiParam(value = "Broadcast id(main track)", requi setResultSuccess(result, success, "Subtrack:" + subTrackId + " cannot be removed from main track: " + id, "Subtrack:{} cannot be removed from main track:{} ", subTrackId.replaceAll(REPLACE_CHARS, "_"), id != null ? id.replaceAll(REPLACE_CHARS, "_") : null); - } - else - { - setResultSuccess(result, false, "Main track of the stream " + subTrackId + " which is " + id +" cannot be updated", + } else { + setResultSuccess(result, false, "Main track of the stream " + subTrackId + " which is " + id + " cannot be updated", "Main track of the stream:{} cannot be updated to {}", subTrackId.replaceAll(REPLACE_CHARS, "_"), id != null ? id.replaceAll(REPLACE_CHARS, "_") : null); } - } - else - { - setResultSuccess(result, false, "There is no stream with id:" + subTrackId, "There is no stream with id:{}" , subTrackId.replaceAll(REPLACE_CHARS, "_")); + } else { + setResultSuccess(result, false, "There is no stream with id:" + subTrackId, "There is no stream with id:{}", subTrackId.replaceAll(REPLACE_CHARS, "_")); } return result; } - @ApiOperation(value = "Returns the stream info(width, height, bitrates and video codec) of the stream", response= BasicStreamInfo[].class) + @ApiOperation(value = "Returns the stream info(width, height, bitrates and video codec) of the stream", response = BasicStreamInfo[].class) @GET @Consumes(MediaType.APPLICATION_JSON) @Path("/{id}/stream-info") @Produces(MediaType.APPLICATION_JSON) - public BasicStreamInfo[] getStreamInfo(@PathParam("id") String streamId) - { + public BasicStreamInfo[] getStreamInfo(@PathParam("id") String streamId) { boolean isCluster = getAppContext().containsBean(IClusterNotifier.BEAN_NAME); List streamInfoList; if (isCluster) { streamInfoList = getDataStore().getStreamInfoList(streamId); - } - else { + } else { IWebRTCAdaptor webRTCAdaptor = (IWebRTCAdaptor) getAppContext().getBean(IWebRTCAdaptor.BEAN_NAME); streamInfoList = webRTCAdaptor.getStreamInfo(streamId); } BasicStreamInfo[] basicStreamInfo = new BasicStreamInfo[0]; - if (streamInfoList != null) - { + if (streamInfoList != null) { basicStreamInfo = new BasicStreamInfo[streamInfoList.size()]; for (int i = 0; i < basicStreamInfo.length; i++) { IStreamInfo iStreamInfo = streamInfoList.get(i); - basicStreamInfo[i] = new BasicStreamInfo(iStreamInfo.getVideoHeight(), iStreamInfo.getVideoWidth(), + basicStreamInfo[i] = new BasicStreamInfo(iStreamInfo.getVideoHeight(), iStreamInfo.getVideoWidth(), iStreamInfo.getVideoBitrate(), iStreamInfo.getAudioBitrate(), iStreamInfo.getVideoCodec()); } } return basicStreamInfo; - } + } @ApiOperation(value = "Send stream participants a message through Data Channel in a WebRTC stream", notes = "", response = Result.class) @POST @Consumes(MediaType.APPLICATION_JSON) @Path("/{id}/data") @Produces(MediaType.APPLICATION_JSON) - public Result sendMessage(@ApiParam(value = "Message through Data Channel which will be sent to all WebRTC stream participants", required = true) String message, - @ApiParam(value = "Broadcast id", required = true) @PathParam("id") String id) { + public Result sendMessage(@ApiParam(value = "Message through Data Channel which will be sent to all WebRTC stream participants", required = true) String message, + @ApiParam(value = "Broadcast id", required = true) @PathParam("id") String id) { AntMediaApplicationAdapter application = getApplication(); // check if WebRTC data channels are supported in this edition - if(application != null && application.isDataChannelMessagingSupported()) { + if (application != null && application.isDataChannelMessagingSupported()) { // check if data channel is enabled in the settings - if(application.isDataChannelEnabled()) { + if (application.isDataChannelEnabled()) { // check if stream with given stream id exists - if(application.doesWebRTCStreamExist(id)) { + if (application.doesWebRTCStreamExist(id)) { // send the message through the application - boolean status = application.sendDataChannelMessage(id,message); - if(status) { + boolean status = application.sendDataChannelMessage(id, message); + if (status) { return new Result(true); } else { return new Result(false, "Operation not completed"); @@ -1239,21 +1150,22 @@ public Result sendMessage(@ApiParam(value = "Message through Data Channel which return new Result(false, "Operation not supported in the Community Edition. Check the Enterprise version for more features."); } } - @ApiOperation(value = "Gets the conference room list from database", notes = "",responseContainer = "List", response = ConferenceRoom.class) + + @ApiOperation(value = "Gets the conference room list from database", notes = "", responseContainer = "List", response = ConferenceRoom.class) @GET @Path("/conference-rooms/list/{offset}/{size}") @Produces(MediaType.APPLICATION_JSON) public List getConferenceRoomList(@ApiParam(value = "This is the offset of the list, it is useful for pagination. If you want to use sort mechanism, we recommend using Mongo DB.", required = true) @PathParam("offset") int offset, - @ApiParam(value = "Number of items that will be fetched. If there is not enough item in the datastore, returned list size may less then this value", required = true) @PathParam("size") int size, - @ApiParam(value = "field to sort", required = false) @QueryParam("sort_by") String sortBy, - @ApiParam(value = "asc for Ascending, desc Descending order", required = false) @QueryParam("order_by") String orderBy, - @ApiParam(value = "Search parameter, returns specific items that contains search string", required = false) @QueryParam("search") String search - ) { - return getDataStore().getConferenceRoomList(offset, size ,sortBy, orderBy, search); + @ApiParam(value = "Number of items that will be fetched. If there is not enough item in the datastore, returned list size may less then this value", required = true) @PathParam("size") int size, + @ApiParam(value = "field to sort", required = false) @QueryParam("sort_by") String sortBy, + @ApiParam(value = "asc for Ascending, desc Descending order", required = false) @QueryParam("order_by") String orderBy, + @ApiParam(value = "Search parameter, returns specific items that contains search string", required = false) @QueryParam("search") String search + ) { + return getDataStore().getConferenceRoomList(offset, size, sortBy, orderBy, search); } @ApiOperation(value = "Get conference room object") - @ApiResponses(value = { @ApiResponse(code = 200, message = "Return the ConferenceRoom object"), + @ApiResponses(value = {@ApiResponse(code = 200, message = "Return the ConferenceRoom object"), @ApiResponse(code = 404, message = "ConferenceRoom object not found")}) @GET @Path("/conference-rooms/{roomId}") @@ -1265,71 +1177,70 @@ public Response getConferenceRoom(@ApiParam(value = "id of the room", required = } if (room != null) { return Response.status(Status.OK).entity(room).build(); - } - else { + } else { return Response.status(Status.NOT_FOUND).build(); } } - @ApiOperation(value="Returns the streams Ids in the room.",responseContainer ="List",response = String.class) + @ApiOperation(value = "Returns the streams Ids in the room.", responseContainer = "List", response = String.class) @GET - @Consumes({ MediaType.APPLICATION_JSON }) + @Consumes({MediaType.APPLICATION_JSON}) @Path("/conference-rooms/{room_id}/room-info") @Produces(MediaType.APPLICATION_JSON) - public RootRestService.RoomInfo getRoomInfo(@ApiParam(value="Room id", required=true) @PathParam("room_id") String roomId, - @ApiParam(value="If Stream Id is entered, that stream id will be isolated from the result",required = false) @QueryParam("streamId") String streamId){ + public RootRestService.RoomInfo getRoomInfo(@ApiParam(value = "Room id", required = true) @PathParam("room_id") String roomId, + @ApiParam(value = "If Stream Id is entered, that stream id will be isolated from the result", required = false) @QueryParam("streamId") String streamId) { ConferenceRoom room = getDataStore().getConferenceRoom(roomId); - return new RootRestService.RoomInfo(roomId,RestServiceBase.getRoomInfoFromConference(roomId,streamId,getDataStore()), room); + return new RootRestService.RoomInfo(roomId, RestServiceBase.getRoomInfoFromConference(roomId, streamId, getDataStore()), room); } - @ApiOperation(value="Adds the specified stream with streamId to the room. Use PUT conference-rooms/{room_id}/{streamId}",response = Result.class) + @ApiOperation(value = "Adds the specified stream with streamId to the room. Use PUT conference-rooms/{room_id}/{streamId}", response = Result.class) @PUT - @Consumes({ MediaType.APPLICATION_JSON }) + @Consumes({MediaType.APPLICATION_JSON}) @Path("/conference-rooms/{room_id}/add") @Produces(MediaType.APPLICATION_JSON) - @Deprecated(since="2.6.2", forRemoval=true) - public Result addStreamToTheRoomDeprecated(@ApiParam(value="Room id", required=true) @PathParam("room_id") String roomId, - @ApiParam(value="Stream id to add to the conference room",required = true) @QueryParam("streamId") String streamId){ + @Deprecated(since = "2.6.2", forRemoval = true) + public Result addStreamToTheRoomDeprecated(@ApiParam(value = "Room id", required = true) @PathParam("room_id") String roomId, + @ApiParam(value = "Stream id to add to the conference room", required = true) @QueryParam("streamId") String streamId) { return addStreamToTheRoom(roomId, streamId); } - @ApiOperation(value="Adds the specified stream with streamId to the room. ",response = Result.class) + @ApiOperation(value = "Adds the specified stream with streamId to the room. ", response = Result.class) @PUT - @Consumes({ MediaType.APPLICATION_JSON }) + @Consumes({MediaType.APPLICATION_JSON}) @Path("/conference-rooms/{room_id}/{streamId}") @Produces(MediaType.APPLICATION_JSON) - public Result addStreamToTheRoom(@ApiParam(value="Room id", required=true) @PathParam("room_id") String roomId, - @ApiParam(value="Stream id to add to the conference room",required = true) @PathParam("streamId") String streamId){ + public Result addStreamToTheRoom(@ApiParam(value = "Room id", required = true) @PathParam("room_id") String roomId, + @ApiParam(value = "Stream id to add to the conference room", required = true) @PathParam("streamId") String streamId) { - boolean result = BroadcastRestService.addStreamToConferenceRoom(roomId,streamId,getDataStore()); - if(result) { + boolean result = BroadcastRestService.addStreamToConferenceRoom(roomId, streamId, getDataStore()); + if (result) { getApplication().joinedTheRoom(roomId, streamId); } return new Result(result); } - @ApiOperation(value="Deletes the specified stream correlated with streamId in the room. Use DELETE /conference-rooms/{room_id}/{streamId}",response = Result.class) + @ApiOperation(value = "Deletes the specified stream correlated with streamId in the room. Use DELETE /conference-rooms/{room_id}/{streamId}", response = Result.class) @PUT - @Consumes({ MediaType.APPLICATION_JSON }) + @Consumes({MediaType.APPLICATION_JSON}) @Path("/conference-rooms/{room_id}/delete") @Produces(MediaType.APPLICATION_JSON) - @Deprecated(since="2.6.2", forRemoval=true) - public Result deleteStreamFromTheRoomDeprecated(@ApiParam(value="Room id", required=true) @PathParam("room_id") String roomId, - @ApiParam(value="Stream id to delete from the conference room",required = true) @QueryParam("streamId") String streamId){ + @Deprecated(since = "2.6.2", forRemoval = true) + public Result deleteStreamFromTheRoomDeprecated(@ApiParam(value = "Room id", required = true) @PathParam("room_id") String roomId, + @ApiParam(value = "Stream id to delete from the conference room", required = true) @QueryParam("streamId") String streamId) { return deleteStreamFromTheRoom(roomId, streamId); } - @ApiOperation(value="Deletes the specified stream correlated with streamId in the room. Use ",response = Result.class) + @ApiOperation(value = "Deletes the specified stream correlated with streamId in the room. Use ", response = Result.class) @DELETE - @Consumes({ MediaType.APPLICATION_JSON }) + @Consumes({MediaType.APPLICATION_JSON}) @Path("/conference-rooms/{room_id}/{streamId}") @Produces(MediaType.APPLICATION_JSON) - public Result deleteStreamFromTheRoom(@ApiParam(value="Room id", required=true) @PathParam("room_id") String roomId, - @ApiParam(value="Stream id to delete from the conference room",required = true) @PathParam("streamId") String streamId){ - boolean result = RestServiceBase.removeStreamFromRoom(roomId,streamId,getDataStore()); - if(result) { + public Result deleteStreamFromTheRoom(@ApiParam(value = "Room id", required = true) @PathParam("room_id") String roomId, + @ApiParam(value = "Stream id to delete from the conference room", required = true) @PathParam("streamId") String streamId) { + boolean result = RestServiceBase.removeStreamFromRoom(roomId, streamId, getDataStore()); + if (result) { getApplication().leftTheRoom(roomId, streamId); } return new Result(result); @@ -1339,12 +1250,12 @@ public Result deleteStreamFromTheRoom(@ApiParam(value="Room id", required=true) @Path("/webrtc-viewers/list/{offset}/{size}") @Produces(MediaType.APPLICATION_JSON) public List getWebRTCViewerList(@ApiParam(value = "This is the offset of the list, it is useful for pagination. If you want to use sort mechanism, we recommend using Mongo DB.", required = true) @PathParam("offset") int offset, - @ApiParam(value = "Number of items that will be fetched. If there is not enough item in the datastore, returned list size may less then this value", required = true) @PathParam("size") int size, - @ApiParam(value = "field to sort", required = false) @QueryParam("sort_by") String sortBy, - @ApiParam(value = "asc for Ascending, desc Descending order", required = false) @QueryParam("order_by") String orderBy, - @ApiParam(value = "Search parameter, returns specific items that contains search string", required = false) @QueryParam("search") String search - ) { - return getDataStore().getWebRTCViewerList(offset, size ,sortBy, orderBy, search); + @ApiParam(value = "Number of items that will be fetched. If there is not enough item in the datastore, returned list size may less then this value", required = true) @PathParam("size") int size, + @ApiParam(value = "field to sort", required = false) @QueryParam("sort_by") String sortBy, + @ApiParam(value = "asc for Ascending, desc Descending order", required = false) @QueryParam("order_by") String orderBy, + @ApiParam(value = "Search parameter, returns specific items that contains search string", required = false) @QueryParam("search") String search + ) { + return getDataStore().getWebRTCViewerList(offset, size, sortBy, orderBy, search); } @ApiOperation(value = "Stop player with a specified id", response = Result.class) @@ -1352,8 +1263,7 @@ public List getWebRTCViewerList(@ApiParam(value = "This is the @Consumes(MediaType.APPLICATION_JSON) @Path("/webrtc-viewers/{webrtc-viewer-id}/stop") @Produces(MediaType.APPLICATION_JSON) - public Result stopPlaying(@ApiParam(value = "the id of the webrtc viewer.", required = true) @PathParam("webrtc-viewer-id") String viewerId) - { + public Result stopPlaying(@ApiParam(value = "the id of the webrtc viewer.", required = true) @PathParam("webrtc-viewer-id") String viewerId) { boolean result = getApplication().stopPlaying(viewerId); return new Result(result); } @@ -1364,16 +1274,15 @@ public Result stopPlaying(@ApiParam(value = "the id of the webrtc viewer.", requ @Path("/{stream_id}/id3") @Produces(MediaType.APPLICATION_JSON) public Result addID3Data(@ApiParam(value = "the id of the stream", required = true) @PathParam("stream_id") String streamId, - @ApiParam(value = "ID3 data.", required = false) String data) { - if(!getAppSettings().isId3TagEnabled()) { + @ApiParam(value = "ID3 data.", required = false) String data) { + if (!getAppSettings().isId3TagEnabled()) { return new Result(false, null, "ID3 tag is not enabled"); } MuxAdaptor muxAdaptor = getMuxAdaptor(streamId); - if(muxAdaptor != null) { + if (muxAdaptor != null) { return new Result(muxAdaptor.addID3Data(data)); - } - else { + } else { return new Result(false, null, "Stream is not available"); } } -} +} \ No newline at end of file diff --git a/src/main/java/io/antmedia/statistic/DashViewerStats.java b/src/main/java/io/antmedia/statistic/DashViewerStats.java index 22b08d22c..3728403a7 100644 --- a/src/main/java/io/antmedia/statistic/DashViewerStats.java +++ b/src/main/java/io/antmedia/statistic/DashViewerStats.java @@ -1,6 +1,7 @@ package io.antmedia.statistic; +import io.antmedia.AntMediaApplicationAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; @@ -34,11 +35,11 @@ public void setApplicationContext(ApplicationContext applicationContext) { AppSettings settings = (AppSettings)applicationContext.getBean(AppSettings.BEAN_NAME); timeoutMS = getTimeoutMSFromSettings(settings, timeoutMS, DASH_TYPE); - + final AntMediaApplicationAdapter antMediaApplicationAdapter = (AntMediaApplicationAdapter)applicationContext.getBean(AntMediaApplicationAdapter.BEAN_NAME); vertx.setPeriodic(DEFAULT_TIME_PERIOD_FOR_VIEWER_COUNT, yt-> { synchronized (lock) { - updateViewerCountProcess(DASH_TYPE); + updateViewerCountProcess(DASH_TYPE, antMediaApplicationAdapter); } }); } diff --git a/src/main/java/io/antmedia/statistic/HlsViewerStats.java b/src/main/java/io/antmedia/statistic/HlsViewerStats.java index f4afec123..72318f5b3 100644 --- a/src/main/java/io/antmedia/statistic/HlsViewerStats.java +++ b/src/main/java/io/antmedia/statistic/HlsViewerStats.java @@ -1,5 +1,6 @@ package io.antmedia.statistic; +import io.antmedia.AntMediaApplicationAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; @@ -33,11 +34,12 @@ public void setApplicationContext(ApplicationContext applicationContext) { AppSettings settings = (AppSettings)applicationContext.getBean(AppSettings.BEAN_NAME); timeoutMS = getTimeoutMSFromSettings(settings, timeoutMS, HLS_TYPE); - + final AntMediaApplicationAdapter antMediaApplicationAdapter = (AntMediaApplicationAdapter)applicationContext.getBean(AntMediaApplicationAdapter.BEAN_NAME); + vertx.setPeriodic(DEFAULT_TIME_PERIOD_FOR_VIEWER_COUNT, yt-> { synchronized (lock) { - updateViewerCountProcess(HLS_TYPE); + updateViewerCountProcess(HLS_TYPE, antMediaApplicationAdapter); } }); } diff --git a/src/main/java/io/antmedia/statistic/IStreamStats.java b/src/main/java/io/antmedia/statistic/IStreamStats.java index 00d250c1e..cc4c29ec2 100644 --- a/src/main/java/io/antmedia/statistic/IStreamStats.java +++ b/src/main/java/io/antmedia/statistic/IStreamStats.java @@ -1,13 +1,17 @@ package io.antmedia.statistic; +import io.antmedia.AntMediaApplicationAdapter; + public interface IStreamStats { /** * Register a new viewer to a stream + * * @param streamId * @param sessionId + * @param jwt */ - void registerNewViewer(String streamId, String sessionId, String subscriberId); + void registerNewViewer(String streamId, String sessionId, String subscriberId, String viewerType, String jwt, AntMediaApplicationAdapter antMediaApplicationAdapter); /** diff --git a/src/main/java/io/antmedia/statistic/ViewerStats.java b/src/main/java/io/antmedia/statistic/ViewerStats.java index 39cafc824..50817e8c7 100644 --- a/src/main/java/io/antmedia/statistic/ViewerStats.java +++ b/src/main/java/io/antmedia/statistic/ViewerStats.java @@ -6,6 +6,7 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import io.antmedia.AntMediaApplicationAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,19 +23,21 @@ public class ViewerStats { protected static Logger logger = LoggerFactory.getLogger(ViewerStats.class); - + protected Vertx vertx; public static final String HLS_TYPE = "hls"; public static final String DASH_TYPE = "dash"; - + public static final String RTMP_TYPE = "rtmp"; + public static final String WEBRTC_TYPE = "webrtc"; + + //hls or dash private String type; private DataStore dataStore; protected DataStoreFactory dataStoreFactory; - public static final int DEFAULT_TIME_PERIOD_FOR_VIEWER_COUNT = 10000; /** @@ -43,6 +46,7 @@ public class ViewerStats { private int timePeriodMS = DEFAULT_TIME_PERIOD_FOR_VIEWER_COUNT; Map> streamsViewerMap = new ConcurrentHashMap<>(); + Map sessionId2Jwt = new ConcurrentHashMap<>(); Map sessionId2subscriberId = new ConcurrentHashMap<>(); Map increaseCounterMap = new ConcurrentHashMap<>(); @@ -55,18 +59,13 @@ public class ViewerStats { * Time out value in milliseconds, it is regarded as user is not watching stream * if last request time is older than timeout value */ - protected int timeoutMS = 20000; + protected int timeoutMS = 10000; - - - public void registerNewViewer(String streamId, String sessionId, String subscriberId) - { + public void registerNewViewer(String streamId, String sessionId, String subscriberId, String viewerType, String jwt, AntMediaApplicationAdapter antMediaApplicationAdapter) { //do not block the thread, run in vertx event queue vertx.runOnContext(h -> { - synchronized (lock) { //synchronize with database update calculations, because some odd cases may happen - Map viewerMap = streamsViewerMap.get(streamId); if (viewerMap == null) { viewerMap = new ConcurrentHashMap<>(); @@ -76,7 +75,15 @@ public void registerNewViewer(String streamId, String sessionId, String subscrib int streamIncrementCounter = getIncreaseCounterMap(streamId); streamIncrementCounter++; increaseCounterMap.put(streamId, streamIncrementCounter); - + + if(subscriberId != null && !subscriberId.equals("undefined")){ + antMediaApplicationAdapter.sendStartPlayWebHook(streamId, subscriberId, jwt, viewerType); + }else{ + antMediaApplicationAdapter.sendStartPlayWebHook(streamId, sessionId, jwt, viewerType); + } + if(jwt != null){ + sessionId2Jwt.put(sessionId, jwt); + } } viewerMap.put(sessionId, System.currentTimeMillis()); streamsViewerMap.put(streamId, viewerMap); @@ -113,7 +120,7 @@ public void registerNewViewer(String streamId, String sessionId, String subscrib }); } - + public void resetViewerMap(String streamID, String type) { Iterator> viewerIterator; @@ -128,6 +135,9 @@ public void resetViewerMap(String streamID, String type) { if(sessionId2subscriberId.containsKey(sessionId)) { sessionId2subscriberId.remove(sessionId); } + if(sessionId2Jwt.containsKey(sessionId)){ + sessionId2Jwt.remove(sessionId); + } } streamsViewerMap.get(streamID).clear(); @@ -138,7 +148,7 @@ public void resetViewerMap(String streamID, String type) { logger.info("Reset {} Stream ID: {} remove failed or null", type, streamID); } } - + public int getViewerCount(String streamId) { Map viewerMap = streamsViewerMap.get(streamId); int viewerCount = 0; @@ -234,7 +244,7 @@ public void setVertx(Vertx vertx) { this.vertx = vertx; } - public void updateViewerCountProcess(String type) { + public void updateViewerCountProcess(String viewerType, AntMediaApplicationAdapter antMediaApplicationAdapter) { Iterator>> streamIterator = streamsViewerMap.entrySet().iterator(); @@ -271,9 +281,15 @@ public void updateViewerCountProcess(String type) { // regard it as not a viewer viewerIterator.remove(); numberOfDecrement++; - + String sessionId = viewer.getKey(); String subscriberId = sessionId2subscriberId.get(sessionId); + String jwt = sessionId2Jwt.get(sessionId); + if(subscriberId !=null && !subscriberId.equals("undefined")){ + antMediaApplicationAdapter.sendStopPlayWebHook(streamId, subscriberId, jwt, viewerType); + }else{ + antMediaApplicationAdapter.sendStopPlayWebHook(streamId, sessionId, jwt, viewerType); + } // set subscriber status to not connected if(subscriberId != null) { // add a disconnected event to the subscriber @@ -300,9 +316,9 @@ public void updateViewerCountProcess(String type) { int diffCount = numberOfIncrement + numberOfDecrement; - logger.info("Update {} viewer in stream ID:{} increment count:{} decrement count:{} diff:{}", type, streamId, numberOfIncrement, numberOfDecrement, diffCount); + logger.info("Update {} viewer in stream ID:{} increment count:{} decrement count:{} diff:{}", viewerType, streamId, numberOfIncrement, numberOfDecrement, diffCount); - if(type.equals(ViewerStats.HLS_TYPE)) { + if(viewerType.equals(ViewerStats.HLS_TYPE)) { getDataStore().updateHLSViewerCount(streamViewerEntry.getKey(), diffCount); } else { @@ -338,6 +354,7 @@ public void updateViewerCountProcess(String type) { } } + public void setServerSettings(ServerSettings serverSettings) { this.serverSettings = serverSettings; diff --git a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java index ae7111e2c..cc85dc371 100644 --- a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java +++ b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java @@ -6,7 +6,8 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.*; + import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -51,6 +52,8 @@ import org.mockito.Mockito; import org.red5.server.api.IContext; import org.red5.server.api.scope.IScope; +import org.red5.server.api.stream.IPlayItem; +import org.red5.server.api.stream.ISubscriberStream; import org.red5.server.stream.ClientBroadcastStream; import org.springframework.context.ApplicationContext; @@ -689,10 +692,10 @@ public void testNotifyHook() { AntMediaApplicationAdapter spyAdaptor = Mockito.spy(adapter); - StringBuilder notifyHook = spyAdaptor.notifyHook(null, null, null, null, null, null, null, null); + StringBuilder notifyHook = spyAdaptor.notifyHook(null, null, null, null, null, null, null, null, null, null, null); assertNull(notifyHook); - notifyHook = spyAdaptor.notifyHook("", null, null, null, null, null, null, null); + notifyHook = spyAdaptor.notifyHook("", null, null, null, null, null, null, null, null, null, null); assertNull(notifyHook); @@ -704,8 +707,10 @@ public void testNotifyHook() { String vodName = "vod name" + String.valueOf((Math.random() * 10000)); String vodId = String.valueOf((Math.random() * 10000)); + String viewerId = String.valueOf((Math.random() * 10000)); + String url = "this is url"; - notifyHook = spyAdaptor.notifyHook(url, id, action, streamName, category, vodName, vodId, null); + notifyHook = spyAdaptor.notifyHook(url, id, action, streamName, category, vodName, vodId, viewerId, null, null, null); assertNull(notifyHook); try { @@ -721,6 +726,7 @@ public void testNotifyHook() { assertEquals(category, variablesMap.get("category")); assertEquals(vodName, variablesMap.get("vodName")); assertEquals(vodId, variablesMap.get("vodId")); + assertEquals(viewerId, variablesMap.get("viewerId")); } catch (IOException e) { e.printStackTrace(); @@ -729,7 +735,7 @@ public void testNotifyHook() { url = "this is second url"; - notifyHook = spyAdaptor.notifyHook(url, id, null, null, null, null, null, null); + notifyHook = spyAdaptor.notifyHook(url, id, null, null, null, null, null, null, null, null, null); assertNull(notifyHook); try { @@ -782,6 +788,9 @@ public void testNotifyHookErrors(){ ArgumentCaptor captureCategory = ArgumentCaptor.forClass(String.class); ArgumentCaptor captureVodName = ArgumentCaptor.forClass(String.class); ArgumentCaptor captureVodId = ArgumentCaptor.forClass(String.class); + ArgumentCaptor captureViewerId = ArgumentCaptor.forClass(String.class); + ArgumentCaptor captureViewerType = ArgumentCaptor.forClass(String.class); + ArgumentCaptor captureToken = ArgumentCaptor.forClass(String.class); ArgumentCaptor captureMetadata = ArgumentCaptor.forClass(String.class); /* @@ -796,7 +805,7 @@ public void testNotifyHookErrors(){ //verify that notifyHook is called 1 time verify(spyAdaptor, times(1)).notifyHook(captureUrl.capture(), captureId.capture(), captureAction.capture(), - captureStreamName.capture(), captureCategory.capture(),captureVodName.capture(), captureVodId.capture(), captureMetadata.capture()); + captureStreamName.capture(), captureCategory.capture(),captureVodName.capture(), captureVodId.capture(), captureViewerId.capture(), captureViewerType.capture(), captureToken.capture(), captureMetadata.capture()); assertEquals(captureUrl.getValue(), broadcast.getListenerHookURL()); assertEquals(captureId.getValue(), broadcast.getStreamId()); @@ -822,7 +831,7 @@ public void testNotifyHookErrors(){ //verify that notifyHook is called 1 time verify(spyAdaptor, times(2)).notifyHook(captureUrl.capture(), captureId.capture(), captureAction.capture(), - captureStreamName.capture(), captureCategory.capture(),captureVodName.capture(), captureVodId.capture(), captureMetadata.capture()); + captureStreamName.capture(), captureCategory.capture(),captureVodName.capture(), captureVodId.capture(), captureViewerId.capture(), captureViewerType.capture(), captureToken.capture(), captureMetadata.capture()); assertEquals(captureUrl.getValue(), broadcast.getListenerHookURL()); assertEquals(captureId.getValue(), broadcast.getStreamId()); @@ -847,7 +856,7 @@ public void testNotifyHookErrors(){ //verify that notifyHook is called 1 time verify(spyAdaptor, times(3)).notifyHook(captureUrl.capture(), captureId.capture(), captureAction.capture(), - captureStreamName.capture(), captureCategory.capture(),captureVodName.capture(), captureVodId.capture(), captureMetadata.capture()); + captureStreamName.capture(), captureCategory.capture(),captureVodName.capture(), captureVodId.capture(), captureViewerId.capture(), captureViewerType.capture(), captureToken.capture(), captureMetadata.capture()); assertEquals(captureUrl.getValue(), broadcast.getListenerHookURL()); assertEquals(captureId.getValue(), broadcast.getStreamId()); @@ -902,6 +911,9 @@ public void testNotifyHookFromMuxingFinished() { ArgumentCaptor captureCategory = ArgumentCaptor.forClass(String.class); ArgumentCaptor captureVodName = ArgumentCaptor.forClass(String.class); ArgumentCaptor captureVodId = ArgumentCaptor.forClass(String.class); + ArgumentCaptor captureViewerId = ArgumentCaptor.forClass(String.class); + ArgumentCaptor captureViewerType = ArgumentCaptor.forClass(String.class); + ArgumentCaptor captureToken = ArgumentCaptor.forClass(String.class); ArgumentCaptor captureMetadata = ArgumentCaptor.forClass(String.class); @@ -911,7 +923,7 @@ public void testNotifyHookFromMuxingFinished() { //verify that notifyHook is never called verify(spyAdaptor, never()).notifyHook(captureUrl.capture(), captureId.capture(), captureAction.capture(), - captureStreamName.capture(), captureCategory.capture(), captureVodName.capture(), captureVodId.capture(), captureMetadata.capture()); + captureStreamName.capture(), captureCategory.capture(), captureVodName.capture(), captureVodId.capture(), captureViewerId.capture(), captureViewerType.capture(), captureToken.capture(), captureMetadata.capture()); /* @@ -937,7 +949,7 @@ public void testNotifyHookFromMuxingFinished() { //verify that notifyHook is called 1 time verify(spyAdaptor, times(1)).notifyHook(captureUrl.capture(), captureId.capture(), captureAction.capture(), - captureStreamName.capture(), captureCategory.capture(), captureVodName.capture(), captureVodId.capture(), captureMetadata.capture()); + captureStreamName.capture(), captureCategory.capture(), captureVodName.capture(), captureVodId.capture(), captureViewerId.capture(), captureViewerType.capture(), captureToken.capture(), captureMetadata.capture()); assertEquals(captureUrl.getValue(), broadcast.getListenerHookURL()); assertEquals(captureId.getValue(), broadcast.getStreamId()); @@ -971,7 +983,7 @@ public void testNotifyHookFromMuxingFinished() { //verify that no new notifyHook is called verify(spyAdaptor, times(1)).notifyHook(captureUrl.capture(), captureId.capture(), captureAction.capture(), - captureStreamName.capture(), captureCategory.capture(), captureVodName.capture(), captureVodId.capture(), captureMetadata.capture()); + captureStreamName.capture(), captureCategory.capture(), captureVodName.capture(), captureVodId.capture(), captureViewerId.capture(), captureViewerType.capture(), captureToken.capture(), captureMetadata.capture()); called = true; } @@ -1000,7 +1012,7 @@ public void testNotifyHookFromMuxingFinished() { //verify that notifyHook is called 2 times verify(spyAdaptor, times(2)).notifyHook(captureUrl.capture(), captureId.capture(), captureAction.capture(), - captureStreamName.capture(), captureCategory.capture(), captureVodName.capture(), captureVodId.capture(), captureMetadata.capture()); + captureStreamName.capture(), captureCategory.capture(), captureVodName.capture(), captureVodId.capture(), captureViewerId.capture(), captureViewerType.capture(), captureToken.capture(), captureMetadata.capture()); assertEquals(captureUrl.getValue(), broadcast.getListenerHookURL()); assertEquals(captureId.getValue(), broadcast.getStreamId()); @@ -1829,6 +1841,114 @@ public void testAppDeletion() adapter.stopApplication(true); verify(dataStore, timeout(ClusterNode.NODE_UPDATE_PERIOD+1000)).close(true); } + @Test + public void testRecordStartedHook() throws Exception { + final AntMediaApplicationAdapter spyAdaptor = Mockito.spy(adapter); + AppSettings appSettings = new AppSettings(); + spyAdaptor.setAppSettings(appSettings); + + Broadcast broadcast = new Broadcast(); + assertEquals("", spyAdaptor.getListenerHookURL(broadcast)); + broadcast.setMp4Enabled(MuxAdaptor.RECORDING_ENABLED_FOR_STREAM); + String hookURL = "listener_hook_url"; + appSettings.setListenerHookURL(hookURL); + + assertEquals(hookURL, spyAdaptor.getListenerHookURL(broadcast)); + + + appSettings = new AppSettings(); + spyAdaptor.setServerSettings(new ServerSettings()); + spyAdaptor.setAppSettings(appSettings); + DataStore dataStore = new InMemoryDataStore("testHook"); + DataStoreFactory dsf = Mockito.mock(DataStoreFactory.class); + Mockito.when(dsf.getDataStore()).thenReturn(dataStore); + spyAdaptor.setDataStoreFactory(dsf); + spyAdaptor.setDataStore(dataStore); + broadcast.setStreamId("stream1"); + broadcast.setName("name"); + broadcast.setCategory("category"); + broadcast.setListenerHookURL(hookURL); + dataStore.save(broadcast); + String streamId = broadcast.getStreamId(); + + + doReturn(new StringBuilder()).when(spyAdaptor).sendPOST(anyString(),anyMap()); + + spyAdaptor.startPublish(streamId, 0, IAntMediaStreamHandler.PUBLISH_TYPE_WEBRTC); + Awaitility.await().atMost(5, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( + ()-> { + boolean called = false; + try{ + verify(spyAdaptor, times(1)).sendStartRecordWebHook(streamId); + verify(spyAdaptor,times(1)).notifyHook(broadcast.getListenerHookURL(),streamId,AntMediaApplicationAdapter.HOOK_ACTION_START_RECORD, broadcast.getName(),broadcast.getCategory(),null,null,null, null, null, broadcast.getMetaData()); + called = true; + }catch (Exception e){ + e.printStackTrace(); + } + return called; + }); + } + + @Test + public void testRtmpStreamPlayStop() throws Exception { + final AntMediaApplicationAdapter spyAdaptor = Mockito.spy(adapter); + AppSettings appSettings = new AppSettings(); + spyAdaptor.setAppSettings(appSettings); + + Broadcast broadcast = new Broadcast(); + assertEquals("", spyAdaptor.getListenerHookURL(broadcast)); + broadcast.setMp4Enabled(MuxAdaptor.RECORDING_ENABLED_FOR_STREAM); + String hookURL = "listener_hook_url"; + appSettings.setListenerHookURL(hookURL); + + assertEquals(hookURL, spyAdaptor.getListenerHookURL(broadcast)); + + + appSettings = new AppSettings(); + spyAdaptor.setServerSettings(new ServerSettings()); + spyAdaptor.setAppSettings(appSettings); + DataStore dataStore = new InMemoryDataStore("test"); + DataStoreFactory dsf = Mockito.mock(DataStoreFactory.class); + Mockito.when(dsf.getDataStore()).thenReturn(dataStore); + spyAdaptor.setDataStoreFactory(dsf); + spyAdaptor.setDataStore(dataStore); + broadcast.setStreamId("stream1"); + broadcast.setName("name"); + broadcast.setCategory("category"); + broadcast.setListenerHookURL(hookURL); + dataStore.save(broadcast); + String streamId = broadcast.getStreamId(); + IPlayItem item = mock(IPlayItem.class); + ISubscriberStream stream = mock(ISubscriberStream.class); + doReturn(streamId).when(item).getName(); + doReturn(new StringBuilder()).when(spyAdaptor).sendPOST(anyString(),anyMap()); + assertEquals(0, spyAdaptor.getDataStore().get(streamId).getRtmpViewerCount()); + + spyAdaptor.streamPlayItemPlay(stream, item, true); + verify(spyAdaptor, times(1)).sendStartPlayWebHook(anyString(), anyString(), any(), anyString()); + Awaitility.await().atMost(5, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( + ()-> { + boolean called = false; + try{ + assertEquals(1, spyAdaptor.getDataStore().get(streamId).getRtmpViewerCount()); + called = true; + }catch (Exception e){ + e.printStackTrace(); + } + return called; + }); + + spyAdaptor.streamPlayItemStop(stream, item); + verify(spyAdaptor, times(1)).sendStopPlayWebHook(anyString(), anyString(), any(), anyString()); + + doReturn(null).when(item).getName(); + spyAdaptor.streamPlayItemStop(stream, item); + + doReturn(streamId).when(item).getName(); + broadcast.setListenerHookURL(null); + spyAdaptor.streamPlayItemStop(stream, item); + + } @Test public void testGetWebRTCClientMap() { diff --git a/src/test/java/io/antmedia/test/Application.java b/src/test/java/io/antmedia/test/Application.java index 1ec01055b..f23784e71 100644 --- a/src/test/java/io/antmedia/test/Application.java +++ b/src/test/java/io/antmedia/test/Application.java @@ -23,9 +23,14 @@ public class Application extends AntMediaApplicationAdapter implements IAntMedia public static boolean enableSourceHealthUpdate = false; public static List notifyVodId = new ArrayList<>();; - - + public static List notifyViewerId = new ArrayList<>();; + + public static List notifyViewerType = new ArrayList<>();; + + public static List notifyToken = new ArrayList<>();; + + @Override public void muxingFinished(String id, File file, long startTime, long duration, int resolution, String previewPath, String vodId) { super.muxingFinished(id, file, startTime, duration, resolution, previewPath, vodId); @@ -46,12 +51,14 @@ public static void resetFields() { notifyStreamName.clear(); notifyCategory.clear(); notifyVodName.clear(); - + notifyViewerId.clear(); + notifyViewerType.clear(); + notifyToken.clear(); } @Override public StringBuilder notifyHook(String url, String id, String action, String streamName, String category, - String vodName, String vodId, String metadata) { + String vodName, String vodId, String viewerId, String viewerType, String token, String metadata) { logger.info("notify hook action: {}", action); notifyHookAction.add(action); notitfyURL.add(url); @@ -60,6 +67,9 @@ public StringBuilder notifyHook(String url, String id, String action, String str notifyCategory.add(category); notifyVodName.add(vodName); notifyVodId.add(vodId); + notifyViewerId.add(viewerId); + notifyViewerType.add(viewerType); + notifyToken.add(token); return null; } diff --git a/src/test/java/io/antmedia/test/MuxerUnitTest.java b/src/test/java/io/antmedia/test/MuxerUnitTest.java index 1c5a752e3..e8682c358 100644 --- a/src/test/java/io/antmedia/test/MuxerUnitTest.java +++ b/src/test/java/io/antmedia/test/MuxerUnitTest.java @@ -24,7 +24,6 @@ import static org.bytedeco.ffmpeg.global.avutil.AV_SAMPLE_FMT_FLTP; import static org.bytedeco.ffmpeg.global.avutil.av_channel_layout_default; import static org.bytedeco.ffmpeg.global.avutil.av_dict_get; -import static org.bytedeco.ffmpeg.global.avutil.av_get_default_channel_layout; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -81,7 +80,6 @@ import org.bytedeco.ffmpeg.avformat.AVStream; import org.bytedeco.ffmpeg.avutil.AVChannelLayout; import org.bytedeco.ffmpeg.avutil.AVDictionary; -import org.bytedeco.ffmpeg.avutil.AVDictionaryEntry; import org.bytedeco.ffmpeg.avutil.AVRational; import org.bytedeco.ffmpeg.global.avcodec; import org.bytedeco.ffmpeg.global.avformat; @@ -137,11 +135,9 @@ import io.antmedia.datastore.db.MapDBStore; import io.antmedia.datastore.db.types.Broadcast; import io.antmedia.datastore.db.types.Endpoint; -import io.antmedia.datastore.db.types.StreamInfo; import io.antmedia.integration.AppFunctionalV2Test; import io.antmedia.integration.MuxingTest; import io.antmedia.muxer.HLSMuxer; -import io.antmedia.muxer.RecordMuxer; import io.antmedia.muxer.IAntMediaStreamHandler; import io.antmedia.muxer.Mp4Muxer; import io.antmedia.muxer.MuxAdaptor; @@ -1665,7 +1661,7 @@ public void testMp4MuxingAndNotifyCallback() { Application app = (Application) applicationContext.getBean("web.handler"); AntMediaApplicationAdapter appAdaptor = Mockito.spy(app); - doReturn(new StringBuilder("")).when(appAdaptor).notifyHook(anyString(), anyString(), anyString(), anyString(), anyString(), anyString(), anyString(), anyString()); + doReturn(new StringBuilder("")).when(appAdaptor).notifyHook(anyString(), anyString(), anyString(), anyString(), anyString(), anyString(), anyString(), anyString(), anyString(), anyString(), anyString()); assertNotNull(appAdaptor); //just check below value that it is not null, this is not related to this case but it should be tested diff --git a/src/test/java/io/antmedia/test/filter/DashStatisticsFilterTest.java b/src/test/java/io/antmedia/test/filter/DashStatisticsFilterTest.java index 0f83e17b2..781c6aedd 100644 --- a/src/test/java/io/antmedia/test/filter/DashStatisticsFilterTest.java +++ b/src/test/java/io/antmedia/test/filter/DashStatisticsFilterTest.java @@ -1,12 +1,8 @@ package io.antmedia.test.filter; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; import java.io.IOException; @@ -18,6 +14,8 @@ import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpSession; +import io.antmedia.AntMediaApplicationAdapter; +import io.antmedia.statistic.ViewerStats; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.junit.After; @@ -130,8 +128,10 @@ public void testDoFilter() { .thenReturn(context); when(filterconfig.getServletContext()).thenReturn(servletContext); - - + + AntMediaApplicationAdapter antMediaApplicationAdapter = mock(AntMediaApplicationAdapter.class); + when(context.getBean(AntMediaApplicationAdapter.BEAN_NAME)).thenReturn(antMediaApplicationAdapter); + try { dashStatisticsFilter.init(filterconfig); //when(dashStatisticsFilter.getStreamStats()).thenReturn(streamStats); @@ -160,12 +160,9 @@ public void testDoFilter() { logger.info("session id {}, stream id {}", sessionId, streamId); dashStatisticsFilter.doFilter(mockRequest, mockResponse, mockChain); - - - verify(streamStats, times(1)).registerNewViewer(streamId, sessionId, null); - - - + + verify(streamStats, times(1)).registerNewViewer(streamId, sessionId, null, ViewerStats.DASH_TYPE, null, antMediaApplicationAdapter); + } catch (ServletException|IOException e) { logger.error(ExceptionUtils.getStackTrace(e)); fail(ExceptionUtils.getStackTrace(e)); @@ -189,7 +186,10 @@ public void testDASHViewerLimit() { IStreamStats streamStats = mock(IStreamStats.class); when(context.getBean(DashViewerStats.BEAN_NAME)).thenReturn(streamStats); - + + AntMediaApplicationAdapter antMediaApplicationAdapter = mock(AntMediaApplicationAdapter.class); + when(context.getBean(AntMediaApplicationAdapter.BEAN_NAME)).thenReturn(antMediaApplicationAdapter); + when(context.isRunning()).thenReturn(true); DataStoreFactory dsf = mock(DataStoreFactory.class); when(context.getBean(DataStoreFactory.BEAN_NAME)).thenReturn(dsf); @@ -211,17 +211,17 @@ public void testDASHViewerLimit() { try { dashStatisticsFilter.init(filterconfig); //when(dashStatisticsFilter.getStreamStats()).thenReturn(streamStats); - + String sessionId = requestDash(streamId); - verify(streamStats, times(1)).registerNewViewer(streamId, sessionId, null); + verify(streamStats, times(1)).registerNewViewer(streamId, sessionId, null, ViewerStats.DASH_TYPE, null, antMediaApplicationAdapter); broadcast.setDashViewerCount(1); String sessionId2 = requestDash(streamId); - verify(streamStats, times(1)).registerNewViewer(streamId, sessionId2, null); + verify(streamStats, times(1)).registerNewViewer(streamId, sessionId2, null, ViewerStats.DASH_TYPE, null, antMediaApplicationAdapter); broadcast.setDashViewerCount(2); String sessionId3 = requestDash(streamId); - verify(streamStats, never()).registerNewViewer(streamId, sessionId3, null); + verify(streamStats, never()).registerNewViewer(streamId, sessionId3, null, ViewerStats.DASH_TYPE,null, antMediaApplicationAdapter); } catch (ServletException|IOException e) { logger.error(ExceptionUtils.getStackTrace(e)); fail(ExceptionUtils.getStackTrace(e)); diff --git a/src/test/java/io/antmedia/test/filter/HlsStatisticsFilterTest.java b/src/test/java/io/antmedia/test/filter/HlsStatisticsFilterTest.java index 3216a3d31..4effe7be0 100644 --- a/src/test/java/io/antmedia/test/filter/HlsStatisticsFilterTest.java +++ b/src/test/java/io/antmedia/test/filter/HlsStatisticsFilterTest.java @@ -13,6 +13,8 @@ import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpSession; +import io.antmedia.AntMediaApplicationAdapter; +import io.antmedia.statistic.ViewerStats; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.junit.After; @@ -125,7 +127,8 @@ public void testDoFilter() { .thenReturn(context); when(filterconfig.getServletContext()).thenReturn(servletContext); - + AntMediaApplicationAdapter antMediaApplicationAdapter = mock(AntMediaApplicationAdapter.class); + when(context.getBean(AntMediaApplicationAdapter.BEAN_NAME)).thenReturn(antMediaApplicationAdapter); try { hlsStatisticsFilter.init(filterconfig); @@ -154,12 +157,9 @@ public void testDoFilter() { logger.info("session id {}, stream id {}", sessionId, streamId); hlsStatisticsFilter.doFilter(mockRequest, mockResponse, mockChain); - - - verify(streamStats, times(1)).registerNewViewer(streamId, sessionId, null); - - - + + verify(streamStats, times(1)).registerNewViewer(streamId, sessionId, null, ViewerStats.HLS_TYPE, null, antMediaApplicationAdapter); + } catch (ServletException|IOException e) { logger.error(ExceptionUtils.getStackTrace(e)); fail(ExceptionUtils.getStackTrace(e)); @@ -200,20 +200,22 @@ public void testHLSViewerLimit() { broadcast.setHlsViewerLimit(2); when(dataStore.get(streamId)).thenReturn(broadcast); - + AntMediaApplicationAdapter antMediaApplicationAdapter = mock(AntMediaApplicationAdapter.class); + when(context.getBean(AntMediaApplicationAdapter.BEAN_NAME)).thenReturn(antMediaApplicationAdapter); + try { hlsStatisticsFilter.init(filterconfig); - + String sessionId = requestHls(streamId); - verify(streamStats, times(1)).registerNewViewer(streamId, sessionId, null); + verify(streamStats, times(1)).registerNewViewer(streamId, sessionId, null, ViewerStats.HLS_TYPE, null, antMediaApplicationAdapter); broadcast.setHlsViewerCount(1); String sessionId2 = requestHls(streamId); - verify(streamStats, times(1)).registerNewViewer(streamId, sessionId2, null); + verify(streamStats, times(1)).registerNewViewer(streamId, sessionId2, null, ViewerStats.HLS_TYPE,null, antMediaApplicationAdapter); broadcast.setHlsViewerCount(2); String sessionId3 = requestHls(streamId); - verify(streamStats, never()).registerNewViewer(streamId, sessionId3, null); + verify(streamStats, never()).registerNewViewer(streamId, sessionId3, null, ViewerStats.HLS_TYPE, null, antMediaApplicationAdapter); } catch (ServletException|IOException e) { logger.error(ExceptionUtils.getStackTrace(e)); fail(ExceptionUtils.getStackTrace(e)); diff --git a/src/test/java/io/antmedia/test/rest/BroadcastRestServiceV2UnitTest.java b/src/test/java/io/antmedia/test/rest/BroadcastRestServiceV2UnitTest.java index d116e6591..eecde92ca 100644 --- a/src/test/java/io/antmedia/test/rest/BroadcastRestServiceV2UnitTest.java +++ b/src/test/java/io/antmedia/test/rest/BroadcastRestServiceV2UnitTest.java @@ -3091,6 +3091,8 @@ public void testWebRTCViewerRestOperations(){ restServiceSpy.setApplication(testApp); restServiceSpy.stopPlaying(viewerId); verify(testApp, times(1)).stopPlaying(viewerId); + + } @Test diff --git a/src/test/java/io/antmedia/test/statistic/DashViewerStatsTest.java b/src/test/java/io/antmedia/test/statistic/DashViewerStatsTest.java index c371e5764..5fd9cef9d 100644 --- a/src/test/java/io/antmedia/test/statistic/DashViewerStatsTest.java +++ b/src/test/java/io/antmedia/test/statistic/DashViewerStatsTest.java @@ -4,20 +4,24 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.times; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.impl.client.CloseableHttpClient; import org.awaitility.Awaitility; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; +import org.red5.server.api.scope.IScope; import org.springframework.context.ApplicationContext; -import ch.qos.logback.classic.Logger; import io.antmedia.AntMediaApplicationAdapter; import io.antmedia.AppSettings; import io.antmedia.datastore.db.DataStore; @@ -34,8 +38,8 @@ public class DashViewerStatsTest { - static Vertx vertx; - + static Vertx vertx; + @BeforeClass public static void beforeClass() { vertx = io.vertx.core.Vertx.vertx(); @@ -64,10 +68,11 @@ public void testDASHViewerCount() { // TODO Auto-generated catch block e.printStackTrace(); } + AntMediaApplicationAdapter antMediaApplicationAdapter = mock(AntMediaApplicationAdapter.class); for (int i = 0; i < 100; i++) { String sessionId = String.valueOf((Math.random() * 999999)); - viewerStats.registerNewViewer(streamId, sessionId, null); + viewerStats.registerNewViewer(streamId, sessionId, null, null, null, antMediaApplicationAdapter); } Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( @@ -81,7 +86,7 @@ public void testDASHViewerCount() { //Add same session ID for (int i = 0; i < 10; i++) { String sessionId = "sameSessionID"; - viewerStats.registerNewViewer(streamId, sessionId, null); + viewerStats.registerNewViewer(streamId, sessionId, null, null, null, antMediaApplicationAdapter); } Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( @@ -116,7 +121,9 @@ public void testSubscriberEvents() { String sessionId = String.valueOf((Math.random() * 999999)); // check if viewer is added - viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId()); + AntMediaApplicationAdapter antMediaApplicationAdapter = mock(AntMediaApplicationAdapter.class); + + viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId(), null, ViewerStats.DASH_TYPE, antMediaApplicationAdapter); Awaitility.await().atMost(15, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( ()-> { boolean eventExist = false; @@ -156,6 +163,10 @@ public void testGetTimeout() { @Test public void testSetApplicationContextSubscribers() { ApplicationContext context = mock(ApplicationContext.class); + AntMediaApplicationAdapter adapter = new AntMediaApplicationAdapter(); + AppSettings appSettings = new AppSettings(); + adapter.setAppSettings(appSettings); + adapter.setVertx(vertx); try { @@ -175,20 +186,39 @@ public void testSetApplicationContextSubscribers() { when(context.getBean(AppSettings.BEAN_NAME)).thenReturn(settings); when(context.getBean(ServerSettings.BEAN_NAME)).thenReturn(new ServerSettings()); - + + IScope scope = mock(IScope.class); + + when(scope.getName()).thenReturn("junit"); + adapter.setScope(scope); + adapter.setDataStoreFactory(dsf); + AntMediaApplicationAdapter spyAdapter = Mockito.spy(adapter); + + when(context.getBean(AntMediaApplicationAdapter.BEAN_NAME)).thenReturn(spyAdapter); + + CloseableHttpClient httpClient = Mockito.mock(CloseableHttpClient.class); + Mockito.doReturn(httpClient).when(spyAdapter).getHttpClient(); + + CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); + Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + Mockito.when(httpResponse.getStatusLine()).thenReturn(Mockito.mock(StatusLine.class)); + + Mockito.when(httpResponse.getEntity()).thenReturn(null); + DashViewerStats viewerStats = new DashViewerStats(); viewerStats.setTimePeriodMS(1000); viewerStats.setApplicationContext(context); - Broadcast broadcast = new Broadcast(); broadcast.setStatus(AntMediaApplicationAdapter.BROADCAST_STATUS_BROADCASTING); broadcast.setName("name"); + broadcast.setListenerHookURL("url"); dsf.setWriteStatsToDatastore(true); dsf.setApplicationContext(context); String streamId = dsf.getDataStore().save(broadcast); + assertEquals(1000, viewerStats.getTimePeriodMS()); assertEquals(10000, viewerStats.getTimeoutMS()); @@ -216,11 +246,27 @@ public void testSetApplicationContextSubscribers() { subscriberPlay3.setSubscriberId("subscriber3"); subscriberPlay3.setB32Secret("6qsp6qhndryqs56zjmvs37i6gqtjsdvc"); subscriberPlay3.setType(Subscriber.PLAY_TYPE); - dsf.getDataStore().addSubscriber(subscriberPlay3.getStreamId(), subscriberPlay3); - - viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId()); - viewerStats.registerNewViewer(streamId, sessionId2, subscriberPlay.getSubscriberId()); - + dsf.getDataStore().addSubscriber(subscriberPlay3.getStreamId(), subscriberPlay3); + + //spyAdapter.setDataStoreFactory(dsf); + + viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId(), ViewerStats.DASH_TYPE, null, spyAdapter); + viewerStats.registerNewViewer(streamId, sessionId2, subscriberPlay.getSubscriberId(), ViewerStats.DASH_TYPE, null, spyAdapter); + + + Awaitility.await().atMost(5, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( + ()-> { + boolean called = false; + try{ + verify(spyAdapter, times(2)).sendStartPlayWebHook(streamId, subscriberPlay.getSubscriberId(), null, ViewerStats.DASH_TYPE); + verify(spyAdapter,times(2)).notifyHook(broadcast.getListenerHookURL(),streamId,AntMediaApplicationAdapter.HOOK_ACTION_START_PLAY, broadcast.getName(),broadcast.getCategory(),null,null,subscriberPlay.getSubscriberId(), ViewerStats.DASH_TYPE, null, broadcast.getMetaData()); + called = true; + }catch (Exception e){ + e.printStackTrace(); + } + return called; + }); + Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( ()->viewerStats.getViewerCount(streamId) == 2 ); @@ -229,11 +275,11 @@ public void testSetApplicationContextSubscribers() { Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( ()->viewerStats.getTotalViewerCount() == 2 ); - + //Viewer timeout increase - viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay2.getSubscriberId()); - viewerStats.registerNewViewer(streamId, sessionId2, subscriberPlay2.getSubscriberId()); - + viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay2.getSubscriberId(), ViewerStats.DASH_TYPE, null, spyAdapter); + viewerStats.registerNewViewer(streamId, sessionId2, subscriberPlay2.getSubscriberId(), ViewerStats.DASH_TYPE, null, spyAdapter); + Awaitility.await().atMost(15, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( ()-> { boolean eventExist = false; @@ -248,7 +294,6 @@ public void testSetApplicationContextSubscribers() { return subData.isConnected() && subData.getCurrentConcurrentConnections() == 2 && eventExist; }); - // Check viewer is online Awaitility.await().atMost(20, TimeUnit.SECONDS).until( ()-> dsf.getDataStore().get(streamId).getDashViewerCount() == 2); @@ -256,7 +301,24 @@ public void testSetApplicationContextSubscribers() { // Wait some time for detect disconnect Awaitility.await().atMost(20, TimeUnit.SECONDS).until( ()-> dsf.getDataStore().get(streamId).getDashViewerCount() == 0); - + + + Awaitility.await().atMost(20, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( + ()-> { + boolean called = false; + try{ + verify(spyAdapter, times(2)).sendStopPlayWebHook(streamId, subscriberPlay2.getSubscriberId(), null, ViewerStats.DASH_TYPE); + + verify(spyAdapter,times(2)).notifyHook(broadcast.getListenerHookURL(),streamId,AntMediaApplicationAdapter.HOOK_ACTION_STOP_PLAY, broadcast.getName(),broadcast.getCategory(),null,null,subscriberPlay2.getSubscriberId(), ViewerStats.DASH_TYPE, null, broadcast.getMetaData()); + + called = true; + }catch (Exception e){ + e.printStackTrace(); + } + return called; + }); + + assertEquals(0, viewerStats.getViewerCount(streamId)); assertEquals(0, viewerStats.getIncreaseCounterMap(streamId)); assertEquals(0, viewerStats.getTotalViewerCount()); @@ -278,7 +340,7 @@ public void testSetApplicationContextSubscribers() { ()-> dsf.getDataStore().save(broadcast).equals(streamId)); - viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay3.getSubscriberId()); + viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay3.getSubscriberId(), ViewerStats.DASH_TYPE,null, spyAdapter); Awaitility.await().atMost(20, TimeUnit.SECONDS).until( ()-> viewerStats.getViewerCount(streamId) == 1); @@ -309,7 +371,27 @@ public void testSetApplicationContextSubscribers() { assertEquals(2, events2.size()); ConnectionEvent eventDis2 = events2.get(1); - assertSame(ConnectionEvent.DISCONNECTED_EVENT, eventDis2.getEventType()); + assertSame(ConnectionEvent.DISCONNECTED_EVENT, eventDis2.getEventType()); + + broadcast.setListenerHookURL(null); + appSettings.setListenerHookURL("url"); + viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId(), ViewerStats.DASH_TYPE, null, spyAdapter); + Awaitility.await().atMost(5, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( + ()-> { + boolean called = false; + try{ + verify(spyAdapter, times(3)).sendStartPlayWebHook(streamId, subscriberPlay.getSubscriberId(), null, ViewerStats.DASH_TYPE); + verify(spyAdapter,times(3)).notifyHook(appSettings.getListenerHookURL(),streamId,AntMediaApplicationAdapter.HOOK_ACTION_START_PLAY, broadcast.getName(),broadcast.getCategory(),null,null,subscriberPlay.getSubscriberId(), ViewerStats.DASH_TYPE, null, broadcast.getMetaData()); + called = true; + }catch (Exception e){ + e.printStackTrace(); + } + return called; + }); + + viewerStats.registerNewViewer("", sessionId, subscriberPlay.getSubscriberId(), ViewerStats.DASH_TYPE, null, spyAdapter); + + } catch (Exception e) { e.printStackTrace(); @@ -323,11 +405,13 @@ public void testSetApplicationContext() { ApplicationContext context = mock(ApplicationContext.class); try { - + AntMediaApplicationAdapter antMediaApplicationAdapter = mock(AntMediaApplicationAdapter.class); + when((AntMediaApplicationAdapter) context.getBean(AntMediaApplicationAdapter.BEAN_NAME)).thenReturn(antMediaApplicationAdapter); DataStoreFactory dsf = new DataStoreFactory(); dsf.setDbType("memorydb"); dsf.setDbName("datastore"); when(context.getBean(DataStoreFactory.BEAN_NAME)).thenReturn(dsf); + antMediaApplicationAdapter.setDataStoreFactory(dsf); when(context.containsBean(AppSettings.BEAN_NAME)).thenReturn(true); @@ -357,9 +441,10 @@ public void testSetApplicationContext() { assertEquals(10000, viewerStats.getTimeoutMS()); String sessionId = "sessionId" + (int)(Math.random() * 10000); + antMediaApplicationAdapter.setAppSettings(settings); + + viewerStats.registerNewViewer(streamId, sessionId, null, null, null, antMediaApplicationAdapter); - viewerStats.registerNewViewer(streamId, sessionId, null); - Awaitility.await().atMost(30, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( ()->viewerStats.getViewerCount(streamId) == 1 ); @@ -370,7 +455,7 @@ public void testSetApplicationContext() { ()->viewerStats.getTotalViewerCount() == 1 ); //Viewer timeout increase - viewerStats.registerNewViewer(streamId, sessionId, null); + viewerStats.registerNewViewer(streamId, sessionId, null, null, null, antMediaApplicationAdapter); // Check viewer is online Awaitility.await().atMost(30, TimeUnit.SECONDS).until( @@ -391,9 +476,8 @@ public void testSetApplicationContext() { Awaitility.await().atMost(30, TimeUnit.SECONDS).until( ()-> dsf.getDataStore().save(broadcast).equals(streamId)); - - viewerStats.registerNewViewer(streamId, sessionId, null); - + viewerStats.registerNewViewer(streamId, sessionId, null, null, null, antMediaApplicationAdapter); + Awaitility.await().atMost(30, TimeUnit.SECONDS).until( ()-> viewerStats.getViewerCount(streamId) == 1); diff --git a/src/test/java/io/antmedia/test/statistic/HlsViewerStatsTest.java b/src/test/java/io/antmedia/test/statistic/HlsViewerStatsTest.java index e53a54f9e..6e22a1361 100644 --- a/src/test/java/io/antmedia/test/statistic/HlsViewerStatsTest.java +++ b/src/test/java/io/antmedia/test/statistic/HlsViewerStatsTest.java @@ -65,10 +65,11 @@ public void testHLSViewerCount() { // TODO Auto-generated catch block e.printStackTrace(); } + AntMediaApplicationAdapter antMediaApplicationAdapter = mock(AntMediaApplicationAdapter.class); for (int i = 0; i < 100; i++) { String sessionId = String.valueOf((Math.random() * 999999)); - viewerStats.registerNewViewer(streamId, sessionId, null); + viewerStats.registerNewViewer(streamId, sessionId, null, null, null, antMediaApplicationAdapter); } Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( @@ -82,7 +83,7 @@ public void testHLSViewerCount() { //Add same session ID for (int i = 0; i < 10; i++) { String sessionId = "sameSessionID"; - viewerStats.registerNewViewer(streamId, sessionId, null); + viewerStats.registerNewViewer(streamId, sessionId, null, null, null, antMediaApplicationAdapter); } Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( @@ -118,8 +119,12 @@ public void testSubscriberEvents() { dataStore.addSubscriber(subscriberPlay.getStreamId(), subscriberPlay); String sessionId = String.valueOf((Math.random() * 999999)); + + AntMediaApplicationAdapter antMediaApplicationAdapter = mock(AntMediaApplicationAdapter.class); + + // check if viewer is added - viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId()); + viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId(), "jwt", null, antMediaApplicationAdapter); Awaitility.await().atMost(15, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( ()-> { boolean eventExist = false; @@ -167,26 +172,30 @@ public void testSetApplicationContextSubscribers() { when(context.getBean(DataStoreFactory.BEAN_NAME)).thenReturn(dsf); when(context.containsBean(AppSettings.BEAN_NAME)).thenReturn(true); - - when(context.getBean(IAntMediaStreamHandler.VERTX_BEAN_NAME)).thenReturn(vertx); - AppSettings settings = new AppSettings(); + when(context.getBean(IAntMediaStreamHandler.VERTX_BEAN_NAME)).thenReturn(vertx); + AppSettings settings = mock(AppSettings.class); + AntMediaApplicationAdapter antMediaApplicationAdapter = mock(AntMediaApplicationAdapter.class); + when((AntMediaApplicationAdapter) context.getBean(AntMediaApplicationAdapter.BEAN_NAME)).thenReturn(antMediaApplicationAdapter); + antMediaApplicationAdapter.setDataStoreFactory(dsf); + antMediaApplicationAdapter.setAppSettings(settings); + when(settings.isWriteStatsToDatastore()).thenReturn(true); //set hls time to 1 settings.setHlsTime("1"); - + when(context.getBean(AppSettings.BEAN_NAME)).thenReturn(settings); when(context.getBean(ServerSettings.BEAN_NAME)).thenReturn(new ServerSettings()); - + HlsViewerStats viewerStats = new HlsViewerStats(); - + viewerStats.setTimePeriodMS(1000); viewerStats.setApplicationContext(context); - + Broadcast broadcast = new Broadcast(); broadcast.setStatus(AntMediaApplicationAdapter.BROADCAST_STATUS_BROADCASTING); broadcast.setName("name"); - + dsf.setWriteStatsToDatastore(true); dsf.setApplicationContext(context); String streamId = dsf.getDataStore().save(broadcast); @@ -195,7 +204,7 @@ public void testSetApplicationContextSubscribers() { assertEquals(10000, viewerStats.getTimeoutMS()); String sessionId = "sessionId" + (int)(Math.random() * 10000); - + // create a subscriber play Subscriber subscriberPlay = new Subscriber(); subscriberPlay.setStreamId(streamId); @@ -203,118 +212,118 @@ public void testSetApplicationContextSubscribers() { subscriberPlay.setB32Secret("6qsp6qhndryqs56zjmvs37i6gqtjsdvc"); subscriberPlay.setType(Subscriber.PLAY_TYPE); dsf.getDataStore().addSubscriber(subscriberPlay.getStreamId(), subscriberPlay); - + Subscriber subscriberPlay2 = new Subscriber(); subscriberPlay2.setStreamId(streamId); subscriberPlay2.setSubscriberId("subscriber2"); subscriberPlay2.setB32Secret("6qsp6qhndryqs56zjmvs37i6gqtjsdvc"); subscriberPlay2.setType(Subscriber.PLAY_TYPE); - dsf.getDataStore().addSubscriber(subscriberPlay2.getStreamId(), subscriberPlay2); - + dsf.getDataStore().addSubscriber(subscriberPlay2.getStreamId(), subscriberPlay2); + Subscriber subscriberPlay3 = new Subscriber(); subscriberPlay3.setStreamId(streamId); subscriberPlay3.setSubscriberId("subscriber3"); subscriberPlay3.setB32Secret("6qsp6qhndryqs56zjmvs37i6gqtjsdvc"); subscriberPlay3.setType(Subscriber.PLAY_TYPE); - dsf.getDataStore().addSubscriber(subscriberPlay3.getStreamId(), subscriberPlay3); - - - viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId()); - + dsf.getDataStore().addSubscriber(subscriberPlay3.getStreamId(), subscriberPlay3); + + viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId(), null, null, antMediaApplicationAdapter); + Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( ()->viewerStats.getViewerCount(streamId) == 1 ); Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( ()->viewerStats.getIncreaseCounterMap(streamId) == 1 ); - + Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( ()->viewerStats.getTotalViewerCount() == 1 ); - + + //Viewer timeout increase - viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay2.getSubscriberId()); - + viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay2.getSubscriberId(), null, null, antMediaApplicationAdapter); + Awaitility.await().atMost(15, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( ()-> { boolean eventExist = false; Subscriber subData = dsf.getDataStore().getSubscriber(streamId, subscriberPlay.getSubscriberId()); - + List events = subData.getStats().getConnectionEvents(); - + if(events.size() == 1) { ConnectionEvent event = events.get(0); - eventExist = ConnectionEvent.CONNECTED_EVENT == event.getEventType(); + eventExist = ConnectionEvent.CONNECTED_EVENT.equals(event.getEventType()); } - return subData.isConnected() && subData.getCurrentConcurrentConnections() == 1 && eventExist; + return subData.isConnected() && subData.getCurrentConcurrentConnections() == 1 && eventExist; }); - + // Check viewer is online Awaitility.await().atMost(20, TimeUnit.SECONDS).until( ()-> dsf.getDataStore().get(streamId).getHlsViewerCount() == 1); - + // Wait some time for detect disconnect Awaitility.await().atMost(20, TimeUnit.SECONDS).until( ()-> dsf.getDataStore().get(streamId).getHlsViewerCount() == 0); - + assertEquals(0, viewerStats.getViewerCount(streamId)); assertEquals(0, viewerStats.getIncreaseCounterMap(streamId)); assertEquals(0, viewerStats.getTotalViewerCount()); - - // a disconnection event should be added + + // a disconnection event should be added Subscriber subData = dsf.getDataStore().getSubscriber(streamId, subscriberPlay2.getSubscriberId()); - + List events = subData.getStats().getConnectionEvents(); - + assertEquals(2, events.size()); ConnectionEvent eventDis = events.get(1); - assertTrue(ConnectionEvent.DISCONNECTED_EVENT == eventDis.getEventType()); - - + assertTrue(ConnectionEvent.DISCONNECTED_EVENT.equals(eventDis.getEventType())); + + // Broadcast finished test broadcast.setStatus(AntMediaApplicationAdapter.BROADCAST_STATUS_FINISHED); dsf.getDataStore().save(broadcast); - + Awaitility.await().atMost(20, TimeUnit.SECONDS).until( ()-> dsf.getDataStore().save(broadcast).equals(streamId)); - - - viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay3.getSubscriberId()); - + + + viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay3.getSubscriberId(), null, null, antMediaApplicationAdapter); + Awaitility.await().atMost(20, TimeUnit.SECONDS).until( ()-> viewerStats.getViewerCount(streamId) == 1); - + assertEquals(1, viewerStats.getViewerCount(streamId)); assertEquals(1, viewerStats.getIncreaseCounterMap(streamId)); assertEquals(1, viewerStats.getTotalViewerCount()); - + // Wait some time for detect disconnect Awaitility.await().atMost(20, TimeUnit.SECONDS).until( ()-> dsf.getDataStore().get(streamId).getHlsViewerCount() == 0); - - // Check Viewer + + // Check Viewer Awaitility.await().atMost(10, TimeUnit.SECONDS).until( ()-> viewerStats.getViewerCount(streamId) == 0); - + Awaitility.await().atMost(10, TimeUnit.SECONDS).until( ()-> viewerStats.getIncreaseCounterMap(streamId) == 0); - + Awaitility.await().atMost(10, TimeUnit.SECONDS).until( ()-> viewerStats.getTotalViewerCount() == 0); - + Subscriber subData2 = dsf.getDataStore().getSubscriber(streamId, subscriberPlay3.getSubscriberId()); - + List events2 = subData2.getStats().getConnectionEvents(); - + assertEquals(2, events2.size()); ConnectionEvent eventDis2 = events.get(1); - assertTrue(ConnectionEvent.DISCONNECTED_EVENT == eventDis2.getEventType()); - + assertTrue(ConnectionEvent.DISCONNECTED_EVENT.equals(eventDis2.getEventType())); + } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } - - } + + } @Test public void testSetApplicationContext() { @@ -331,8 +340,12 @@ public void testSetApplicationContext() { when(context.getBean(IAntMediaStreamHandler.VERTX_BEAN_NAME)).thenReturn(vertx); - AppSettings settings = new AppSettings(); - + AppSettings settings = mock(AppSettings.class); + AntMediaApplicationAdapter antMediaApplicationAdapter = mock(AntMediaApplicationAdapter.class); + when((AntMediaApplicationAdapter) context.getBean(AntMediaApplicationAdapter.BEAN_NAME)).thenReturn(antMediaApplicationAdapter); + antMediaApplicationAdapter.setDataStoreFactory(dsf); + antMediaApplicationAdapter.setAppSettings(settings); + when(settings.isWriteStatsToDatastore()).thenReturn(true); //set hls time to 1 settings.setHlsTime("1"); @@ -357,7 +370,7 @@ public void testSetApplicationContext() { String sessionId = "sessionId" + (int)(Math.random() * 10000); - viewerStats.registerNewViewer(streamId, sessionId, null); + viewerStats.registerNewViewer(streamId, sessionId, null, null, null, antMediaApplicationAdapter); Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( ()->viewerStats.getViewerCount(streamId) == 1 ); @@ -369,7 +382,7 @@ public void testSetApplicationContext() { ()->viewerStats.getTotalViewerCount() == 1 ); //Viewer timeout increase - viewerStats.registerNewViewer(streamId, sessionId, null); + viewerStats.registerNewViewer(streamId, sessionId, null, null, null, antMediaApplicationAdapter); // Check viewer is online Awaitility.await().atMost(20, TimeUnit.SECONDS).until( @@ -391,7 +404,7 @@ public void testSetApplicationContext() { ()-> dsf.getDataStore().save(broadcast).equals(streamId)); - viewerStats.registerNewViewer(streamId, sessionId, null); + viewerStats.registerNewViewer(streamId, sessionId, null, null, null, antMediaApplicationAdapter); Awaitility.await().atMost(20, TimeUnit.SECONDS).until( ()-> viewerStats.getViewerCount(streamId) == 1); @@ -410,7 +423,7 @@ public void testSetApplicationContext() { Awaitility.await().atMost(10, TimeUnit.SECONDS).until( ()-> viewerStats.getIncreaseCounterMap(streamId) == 0); - + Awaitility.await().atMost(10, TimeUnit.SECONDS).until( ()-> viewerStats.getTotalViewerCount() == 0);