Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New webhooks playStart, playStop, recordStart issue #4666 #4738

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a029073
New webhooks playStart, playStop, recordStart issue #4666
Jan 3, 2023
88444c5
New webhooks playStart, playStop, recordStart issue #4666
Jan 3, 2023
364d9bd
Merge branch 'newWebHooks' of https://github.com/ant-media/Ant-Media-…
Jan 8, 2023
f4ec03a
New webhooks playStart, playStop, recordStart issue #4666
Jan 9, 2023
671f9fa
New webhooks playStart, playStop, recordStart issue #4666
Jan 9, 2023
a59ec87
new web hooks
Jan 10, 2023
f7297b1
New webhooks playStart, playStop, recordStart issue #4666
Jan 17, 2023
7b12efc
Merge branch 'master' into newWebHooks
lastpeony Jan 17, 2023
0f612e8
New webhooks playStart, playStop, recordStart issue #4666
Jan 17, 2023
8d66694
Merge branch 'newWebHooks' of https://github.com/ant-media/Ant-Media-…
Jan 17, 2023
878221c
New webhooks playStart, playStop, recordStart issue #4666
Jan 17, 2023
7534766
New webhooks playStart, playStop, recordStart issue #4666
Jan 17, 2023
daa8baa
New webhooks playStart, playStop, recordStart issue #4666
Jan 17, 2023
6a38708
New webhooks playStart, playStop, recordStart issue #4666
Jan 17, 2023
5919807
Merge branch 'master' into newWebHooks
Jan 19, 2023
a29f692
jwt passed to hook
Jan 27, 2023
17a73a9
Merge branch 'master' into newWebHooks
Jan 30, 2023
f68330c
stop webrtc player with play token modification.
Feb 2, 2023
26afa11
Merge branch 'master' into newWebHooks
Feb 23, 2023
9f10b5d
Merge branch 'master' into newWebHooks
Mar 3, 2023
c7fb467
Merge master
mekya Jun 13, 2023
4f6b351
Fix test case
mekya Jun 13, 2023
c50d1e9
Merge branch 'master' into newWebHooks
Jun 13, 2023
6b5c02a
refactoring rest call for backwards comp
Jun 14, 2023
f05961f
Merge branch 'master' into newWebHooks
Jun 20, 2023
e25041a
Merge branch 'master' into newWebHooks
Oct 19, 2023
6b8211b
Merge branch 'master' into newWebHooks
Oct 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ temp/

*.iml
.idea
.fleet
.fleet
153 changes: 111 additions & 42 deletions src/main/java/io/antmedia/AntMediaApplicationAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
import java.nio.file.Path;
import java.util.*;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.regex.Pattern;

import javax.validation.constraints.NotNull;

Expand Down Expand Up @@ -42,8 +43,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.errorprone.annotations.NoAllocation;

import io.antmedia.cluster.ClusterNode;
import io.antmedia.cluster.IClusterNotifier;
import io.antmedia.datastore.db.DataStore;
Expand All @@ -60,7 +59,6 @@
import io.antmedia.plugin.api.IFrameListener;
import io.antmedia.plugin.api.IPacketListener;
import io.antmedia.plugin.api.IStreamListener;
import io.antmedia.plugin.api.StreamParametersInfo;
import io.antmedia.rest.RestServiceBase;
import io.antmedia.rest.model.Result;
import io.antmedia.security.AcceptOnlyStreamsInDataStore;
Expand Down Expand Up @@ -93,6 +91,9 @@ public class AntMediaApplicationAdapter extends MultiThreadedApplicationAdapter
public static final String HOOK_ACTION_PUBLISH_TIMEOUT_ERROR = "publishTimeoutError";
public static final String HOOK_ACTION_ENCODER_NOT_OPENED_ERROR = "encoderNotOpenedError";
public static final String HOOK_ACTION_ENDPOINT_FAILED = "endpointFailed";
public static final String HOOK_ACTION_START_PLAY = "playStart";
public static final String HOOK_ACTION_STOP_PLAY = "playStop";
public static final String HOOK_ACTION_START_RECORD = "recordStart";

public static final String STREAMS = "streams";

Expand Down Expand Up @@ -489,7 +490,8 @@ public void closeBroadcast(String streamId) {
final String category = broadcast.getCategory();
final String metaData = broadcast.getMetaData();
logger.info("Setting timer to call live stream ended hook for stream:{}",streamId );
vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_END_LIVE_STREAM, name, category, null, null, metaData));
vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_END_LIVE_STREAM, name, category, null, null, null, null, null, metaData));

}

if (broadcast.isZombi()) {
Expand Down Expand Up @@ -542,13 +544,67 @@ public void resetDASHStats(String streamId) {
}
}

public void sendStartPlayWebHook(final String streamId, final String viewerId, final String token, final String viewerType){
final Broadcast broadcast = getDataStore().get(streamId);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7% of developers fix this issue

THREAD_SAFETY_VIOLATION: Unprotected write. Non-private method AntMediaApplicationAdapter.sendStartPlayWebHook(...) indirectly writes to field this.dataStore outside of synchronization.
Reporting because another access to the same memory occurs on a background thread, although this access may not.


ℹ️ Learn about @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.


Was this a good recommendation?
[ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ]

if(broadcast == null){
lastpeony marked this conversation as resolved.
Show resolved Hide resolved
return;
}
final String listenerHookURL = getListenerHookURL(broadcast);
if (listenerHookURL == null || listenerHookURL.isEmpty()) {
return;
}
sendWebHook(broadcast, listenerHookURL, viewerId, viewerType, token, HOOK_ACTION_START_PLAY);
}

public void sendStopPlayWebHook(final String streamId, final String viewerId, final String token, final String viewerType){
final Broadcast broadcast = getDataStore().get(streamId);
muratugureminoglu marked this conversation as resolved.
Show resolved Hide resolved
if(broadcast == null){
return;
}
final String listenerHookURL = getListenerHookURL(broadcast);
if (listenerHookURL == null || listenerHookURL.isEmpty()) {
return;
}
sendWebHook(broadcast, listenerHookURL, viewerId, viewerType, token, HOOK_ACTION_STOP_PLAY);
}

public void sendStartRecordWebHook(final String streamId){
final Broadcast broadcast = getDataStore().get(streamId);
muratugureminoglu marked this conversation as resolved.
Show resolved Hide resolved
if(broadcast == null){
return;
}
final String listenerHookURL = getListenerHookURL(broadcast);
if (listenerHookURL == null || listenerHookURL.isEmpty()) {
return;
}
sendWebHook(broadcast, listenerHookURL, null, null,null, HOOK_ACTION_START_RECORD);
}

private void sendWebHook(final Broadcast broadcast, final String listenerHookUrl, final String viewerId, final String viewerType, final String token, final String hookAction){
final String streamId = broadcast.getStreamId();
final String name = broadcast.getName();
final String category = broadcast.getCategory();
final String metaData = broadcast.getMetaData();
logger.info("Setting timer to call {} hook for stream:{}", hookAction, streamId);
vertx.setTimer(10, e -> notifyHook(listenerHookUrl, streamId, hookAction, name, category,
null, null, viewerId, viewerType, token, metaData));
}

private String getRtmpViewerId(){
return "rtmp_" + RandomStringUtils.randomNumeric(8);
}

@Override
public void streamPlayItemPlay(ISubscriberStream stream, IPlayItem item, boolean isLive) {
vertx.setTimer(1, l -> getDataStore().updateRtmpViewerCount(item.getName(), true));
final String streamId = item.getName();
sendStartPlayWebHook(streamId, getRtmpViewerId(), null, ViewerStats.RTMP_TYPE);
lastpeony marked this conversation as resolved.
Show resolved Hide resolved
vertx.setTimer(1, l -> getDataStore().updateRtmpViewerCount(streamId, true));
}
@Override
public void streamPlayItemStop(ISubscriberStream stream, IPlayItem item) {
vertx.setTimer(1, l -> getDataStore().updateRtmpViewerCount(item.getName(), false));
final String streamId = item.getName();
sendStopPlayWebHook(streamId, getRtmpViewerId(), null, ViewerStats.RTMP_TYPE);
vertx.setTimer(1, l -> getDataStore().updateRtmpViewerCount(streamId, false));
}

@Override
Expand All @@ -560,7 +616,6 @@ public void streamSubscriberClose(ISubscriberStream stream) {
public void startPublish(String streamId, long absoluteStartTimeMs, String publishType) {
vertx.executeBlocking( handler -> {
try {

Broadcast broadcast = updateBroadcastStatus(streamId, absoluteStartTimeMs, publishType, getDataStore().get(streamId));

final String listenerHookURL = getListenerHookURL(broadcast);
Expand All @@ -571,7 +626,13 @@ public void startPublish(String streamId, long absoluteStartTimeMs, String publi
final String metaData = broadcast.getMetaData();
logger.info("Setting timer to call live stream started hook for stream:{}",streamId );
vertx.setTimer(10, e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_START_LIVE_STREAM, name, category,
null, null, metaData));
null, null, null, null, null, metaData));
}

if ((broadcast.getMp4Enabled() == MuxAdaptor.RECORDING_ENABLED_FOR_STREAM || broadcast.getWebMEnabled() == MuxAdaptor.RECORDING_ENABLED_FOR_STREAM)
|| (appSettings.isMp4MuxingEnabled() || appSettings.isWebMMuxingEnabled())
) {
sendStartRecordWebHook(streamId);
}

int ingestingStreamLimit = appSettings.getIngestingStreamLimit();
Expand Down Expand Up @@ -740,7 +801,8 @@ public void muxingFinished(final String streamId, File file, long startTime, lon
final String metaData = (broadcast != null) ? broadcast.getMetaData() : null;
String finalListenerHookURL = listenerHookURL;
logger.info("Setting timer for calling vod ready hook for stream:{}", streamId);
vertx.runOnContext(e -> notifyHook(finalListenerHookURL, streamId, HOOK_ACTION_VOD_READY, null, null, baseName, vodIdFinal, metaData));
vertx.runOnContext(e -> notifyHook(finalListenerHookURL, streamId, HOOK_ACTION_VOD_READY, null, null, baseName, vodIdFinal, null, null, null, metaData));

}

String muxerFinishScript = appSettings.getMuxerFinishScript();
Expand Down Expand Up @@ -785,33 +847,24 @@ public static String getRelativePath(String filePath){

/**
* Notify hook with parameters below
*
* @param url
* is the url of the service to be called
*
* @param id
* is the stream id that is unique for each stream
*
* @param action
* is the name of the action to be notified, it has values such
* as {@link #HOOK_ACTION_END_LIVE_STREAM}
* {@link #HOOK_ACTION_START_LIVE_STREAM}
*
* @param streamName,
* name of the stream. It is not the name of the file. It is just
* a user friendly name
*
* @param category,
* category of the stream
*
* @param vodName name of the vod
*
* @param vodId id of the vod in the datastore
*
*
* @param streamName, name of the stream. It is not the name of the file. It is just
* a user friendly name
* @param category, category of the stream
* @param url is the url of the service to be called
* @param id is the stream id that is unique for each stream
* @param action is the name of the action to be notified, it has values such
* as {@link #HOOK_ACTION_END_LIVE_STREAM}
* {@link #HOOK_ACTION_START_LIVE_STREAM}
* @param vodName name of the vod
* @param vodId id of the vod in the datastore
* @param viewerId session id or subscriber id
* @param viewerType
lastpeony marked this conversation as resolved.
Show resolved Hide resolved
* @param token play token
* @return
muratugureminoglu marked this conversation as resolved.
Show resolved Hide resolved
*/
public StringBuilder notifyHook(String url, String id, String action, String streamName, String category,
String vodName, String vodId, String metadata) {
String vodName, String vodId, String viewerId, String viewerType, String token, String metadata) {
StringBuilder response = null;
logger.info("Running notify hook url:{} stream id: {} action:{} vod name:{} vod id:{}", url, id, action, vodName, vodId);
if (url != null && url.length() > 0) {
Expand All @@ -834,7 +887,19 @@ public StringBuilder notifyHook(String url, String id, String action, String str
variables.put("vodId", vodId);
}

if (metadata != null) {
if(viewerId != null){
variables.put("viewerId", viewerId);
}

if(viewerType != null){
variables.put("viewerType", viewerType);
}

if(token !=null){
variables.put("token",token);
}

if (metadata != null && !metadata.isEmpty()) {
variables.put("metadata", metadata);
}

Expand All @@ -849,7 +914,6 @@ public StringBuilder notifyHook(String url, String id, String action, String str
}

public StringBuilder sendPOST(String url, Map<String, String> variables) throws IOException {

StringBuilder response = null;
try (CloseableHttpClient httpClient = getHttpClient())
{
Expand Down Expand Up @@ -1310,7 +1374,8 @@ public synchronized void incrementEncoderNotOpenedError(String streamId) {
final String category = broadcast.getCategory();
final String metaData = broadcast.getMetaData();
logger.info("Setting timer to call encoder not opened error for stream:{}", streamId);
vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_ENCODER_NOT_OPENED_ERROR, name, category, null, null, metaData));
vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_ENCODER_NOT_OPENED_ERROR, name, category, null, null, null, null, null, metaData));

}
}
}
Expand Down Expand Up @@ -1338,8 +1403,8 @@ public synchronized void publishTimeoutError(String streamId, String subscriberI

JSONObject jsonResponse = new JSONObject();
jsonResponse.put(WebSocketConstants.SUBSCRIBER_ID, subscriberId);

vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_PUBLISH_TIMEOUT_ERROR, name, category, null, null, jsonResponse.toJSONString()));
vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_PUBLISH_TIMEOUT_ERROR, name, category, null, null, null, null, null, jsonResponse.toJSONString()));
}
}
}
Expand Down Expand Up @@ -1451,7 +1516,6 @@ private boolean isEncoderSettingsValid(List<EncoderSettings> encoderSettingsList
/**
*
* @param newSettings
muratugureminoglu marked this conversation as resolved.
Show resolved Hide resolved
muratugureminoglu marked this conversation as resolved.
Show resolved Hide resolved
* @param checkUpdateTime
* @return true if timing is valid, false if it is invalid
*/
public boolean isIncomingTimeValid(AppSettings newSettings)
Expand Down Expand Up @@ -1708,7 +1772,7 @@ public void endpointFailedUpdate(String streamId, String url) {
logger.info("Setting timer to call rtmp endpoint failed hook for stream:{}", streamId);
JSONObject jsonObject = new JSONObject();
jsonObject.put("rtmp-url", url);
vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_ENDPOINT_FAILED, name, category, null, null, jsonObject.toJSONString()));
vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_ENDPOINT_FAILED, name, category, null, null, null, null, null, jsonObject.toJSONString()));
}
}
}
Expand Down Expand Up @@ -1790,9 +1854,14 @@ public void removeStreamListener(IStreamListener listener) {
streamListeners.remove(listener);
}

public boolean stopPlaying(String viewerId) {
public boolean stopPlayingByViewerId(String viewerId) {
return false;
}

public boolean stopPlayingByPlayToken(String playToken) {
return false;
}

public void stopPublish(String streamId) {
vertx.executeBlocking(handler-> closeBroadcast(streamId) , null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
if (serverSettings != null
&& serverSettings.isJwtServerControlEnabled()
&& jwtToken != null)

{
if(checkJWT(jwtToken)) {
if(checkJWTServerSettings(jwtToken)) {

chain.doFilter(request, response);
}
else {
Expand Down Expand Up @@ -247,7 +247,7 @@ private boolean scopeAccessGranted(String userScope, String dispatchUrl){
}


private boolean checkJWT( String jwtString) {
private boolean checkJWTServerSettings(String jwtString) {
boolean result = true;
try {

Expand Down
16 changes: 14 additions & 2 deletions src/main/java/io/antmedia/datastore/db/types/WebRTCViewerInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ public class WebRTCViewerInfo {
@ApiModelProperty(value = "IP address of the edge to which viewer is connected")
private String edgeAddress;


/**
* Play token of the viewer
*/
@ApiModelProperty(value = "the token of the viewer")
private String tokenId;

public ObjectId getDbId() {
return dbId;
}
Expand Down Expand Up @@ -72,5 +77,12 @@ public String getEdgeAddress() {
public void setEdgeAddress(String edgeAddress) {
this.edgeAddress = edgeAddress;
}


public String getTokenId() {
return tokenId;
}

public void setTokenId(String tokenId) {
this.tokenId = tokenId;
}
}
13 changes: 12 additions & 1 deletion src/main/java/io/antmedia/filter/AbstractFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Queue;

import javax.servlet.Filter;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;

import io.antmedia.AntMediaApplicationAdapter;
import org.apache.catalina.util.NetMask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -163,5 +163,16 @@ public Broadcast getBroadcast(HttpServletRequest request, String streamId) {
}
return broadcast;
}

protected AntMediaApplicationAdapter getAntMediaApplicationAdapter(){
AntMediaApplicationAdapter antMediaApplicationAdapter = null;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

11% of developers fix this issue

Var: Non-constant variable missing @var annotation


Suggested change
AntMediaApplicationAdapter antMediaApplicationAdapter = null;
@Var AntMediaApplicationAdapter antMediaApplicationAdapter = null;

ℹ️ Learn about @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.


Was this a good recommendation?
[ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ]

ApplicationContext context = getAppContext();
if (context != null)
{
antMediaApplicationAdapter= (AntMediaApplicationAdapter)context.getBean(AntMediaApplicationAdapter.BEAN_NAME);
}
return antMediaApplicationAdapter;

}

}