Skip to content

Commit

Permalink
Merge pull request #313 from ant-media/release-1.4.0
Browse files Browse the repository at this point in the history
Release 1.4.0
  • Loading branch information
mekya committed Aug 2, 2018
2 parents 48600d1 + d96cdd8 commit dbcc1b2
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 60 deletions.
19 changes: 9 additions & 10 deletions src/main/java/io/antmedia/webrtc/WebRTCMuxer.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,12 @@ public void registerWebRTCClient(IWebRTCClient webRTCClient) {

@Override
public boolean unRegisterWebRTCClient(IWebRTCClient webRTCClient) {
clientCount.decrementAndGet();
return webRTCClientList.remove(webRTCClient);

boolean result = webRTCClientList.remove(webRTCClient);
if (result) {
clientCount.decrementAndGet();
}
return result;
}

@Override
Expand Down Expand Up @@ -449,18 +453,10 @@ public void setStreamId(String streamId) {

}

public int getWidth() {
return width;
}

public void setWidth(int width) {
this.width = width;
}

public int getHeight() {
return height;
}

public void setHeight(int height) {
this.height = height;
}
Expand Down Expand Up @@ -504,5 +500,8 @@ public int getClientCount() {
return clientCount.intValue();
}

public void setVideoConf(byte[] videoConf) {
this.videoConf = videoConf;
}

}
82 changes: 42 additions & 40 deletions src/main/java/io/antmedia/webrtc/adaptor/Adaptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,25 @@ public abstract class Adaptor implements Observer, SdpObserver
private MediaConstraints sdpMediaConstraints;
protected PeerConnectionFactory peerConnectionFactory;
protected WebSocketCommunityHandler webSocketCommunityHandler;

private String streamId;

private Session session;

protected static final Logger log = Red5LoggerFactory.getLogger(Adaptor.class);

public Adaptor(WebSocketCommunityHandler websocketCommunityHandler) {
this.webSocketCommunityHandler = websocketCommunityHandler;
}

public abstract void start();

public abstract void stop();

public void setPeerConnection(PeerConnection peerConnection) {
this.peerConnection = peerConnection;
}

public PeerConnection getPeerConnection() {
return peerConnection;
}
Expand Down Expand Up @@ -73,22 +73,22 @@ public void onIceConnectionReceivingChange(boolean receiving) {
public void onIceGatheringChange(IceGatheringState newState) {

}

@Override
public void onIceCandidate(IceCandidate candidate) {
log.warn("onIceCandidate");

webSocketCommunityHandler
.sendTakeCandidateMessage(candidate.sdpMLineIndex, candidate.sdpMid, candidate.sdp, streamId, session);
.sendTakeCandidateMessage(candidate.sdpMLineIndex, candidate.sdpMid, candidate.sdp, streamId, session);

}

@Override
public void onIceCandidatesRemoved(IceCandidate[] candidates) {
}

public void onAddStream(MediaStream stream) {}

@Override
public void onRemoveStream(MediaStream stream) {

Expand All @@ -107,46 +107,48 @@ public void onRenegotiationNeeded() {
@Override
public void onAddTrack(RtpReceiver receiver, MediaStream[] mediaStreams) {
}

@Override
public void onCreateSuccess(SessionDescription sdp) {
log.warn("onCreate Success");
peerConnection.setLocalDescription(new SdpObserver() {

@Override
public void onSetSuccess() {

}

@Override
public void onSetFailure(String error) {

}

@Override
public void onCreateSuccess(SessionDescription sdp) {

}

@Override
public void onCreateFailure(String error) {
log.warn("onCreate Success for stream: {}", streamId);
if (peerConnection != null) {
peerConnection.setLocalDescription(new SdpObserver() {

@Override
public void onSetSuccess() {
log.info("set localdescription on set success for {}", streamId);
}

@Override
public void onSetFailure(String error) {
log.info("set localdescription onSetFailure for {}", streamId);
}

@Override
public void onCreateSuccess(SessionDescription sdp) {
log.info("set localdescription onCreateSuccess for {}", streamId);
}

@Override
public void onCreateFailure(String error) {
log.info("set localdescription onCreateSuccess for {}", streamId);
}
}, sdp);
}

}
}, sdp);


String type;
if (sdp.type == Type.ANSWER) {
type = "answer";
}
else {
type = "offer";
}

webSocketCommunityHandler.sendSDPConfiguration(sdp.description, type, streamId, session);

}

@Override
public void onSetSuccess() {
log.warn("on setSuccess");
Expand All @@ -173,12 +175,12 @@ public void setSdpMediaConstraints(MediaConstraints sdpMediaConstraints) {
public void setPeerConnectionFactory(PeerConnectionFactory peerConnectionFactory) {
this.peerConnectionFactory = peerConnectionFactory;
}


public void setSession(Session session) {
this.session = session;
}

public Session getSession() {
return session;
}
Expand Down
11 changes: 11 additions & 0 deletions src/test/java/io/antmedia/test/rest/RestServiceUnitTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,19 @@ public void testDeleteVoD() {
when(context.getBean(AntMediaApplicationAdapter.BEAN_NAME)).thenReturn(app);

restService.setAppCtx(context);

VoD voD = restService.getVoD(vodId);
assertEquals(vodId, voD.getVodId());
assertEquals(streamVod.getStreamId(), voD.getStreamId());
assertEquals(streamVod.getVodName(), voD.getVodName());
assertEquals(streamVod.getFilePath(), voD.getFilePath());

assertEquals(1, restService.getVodList(0, 50).size());

restService.deleteVoD(vodId);

assertEquals(0, restService.getVodList(0, 50).size());

assertNull(datastore.getVoD(vodId));

}
Expand Down
109 changes: 99 additions & 10 deletions src/test/java/io/antmedia/test/webrtc/adaptor/RTMPAdaptorTest.java
Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
package io.antmedia.test.webrtc.adaptor;

import org.apache.commons.lang3.RandomStringUtils;
import org.awaitility.Awaitility;
import org.json.simple.JSONObject;
import org.junit.Before;
import org.junit.Test;
import org.red5.net.websocket.WebSocketConnection;
import org.springframework.context.ApplicationContext;
import org.webrtc.IceCandidate;
import org.webrtc.MediaStream;
import org.webrtc.PeerConnection;
import org.webrtc.PeerConnectionFactory;
import org.webrtc.SessionDescription;
import org.webrtc.SessionDescription.Type;

import io.antmedia.recorder.FFmpegFrameRecorder;
import io.antmedia.webrtc.adaptor.RTMPAdaptor;
import io.antmedia.websocket.WebSocketCommunityHandler;
import io.antmedia.websocket.WebSocketConstants;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import javax.websocket.RemoteEndpoint;
import javax.websocket.Session;

public class RTMPAdaptorTest {
Expand All @@ -27,10 +36,10 @@ public void setup() {

}


@Test
public void testOnAddStream() {

FFmpegFrameRecorder recorder = mock(FFmpegFrameRecorder.class);

WebSocketCommunityHandler webSocketHandlerReal = new WebSocketCommunityHandler() {
Expand All @@ -50,13 +59,61 @@ public ApplicationContext getAppContext() {
rtmpAdaptor.setStreamId(streamId);
Session session = mock(Session.class);
rtmpAdaptor.setSession(session);

MediaStream stream = mock(MediaStream.class);
rtmpAdaptor.onAddStream(stream);

verify(webSocketHandler).sendPublishStartedMessage(streamId, session);
}

@Test
public void testCandidate() {
FFmpegFrameRecorder recorder = mock(FFmpegFrameRecorder.class);

WebSocketCommunityHandler webSocketHandlerReal = new WebSocketCommunityHandler() {

@Override
public ApplicationContext getAppContext() {
return null;
}
};

WebSocketCommunityHandler webSocketHandler = spy(webSocketHandlerReal);

RTMPAdaptor adaptorReal = new RTMPAdaptor(recorder, webSocketHandler);
RTMPAdaptor rtmpAdaptor = spy(adaptorReal);

String streamId = "stramId" + (int)(Math.random()*10000);
rtmpAdaptor.setStreamId(streamId);
Session session = mock(Session.class);
RemoteEndpoint.Basic basicRemote = mock(RemoteEndpoint.Basic .class);
when(session.getBasicRemote()).thenReturn(basicRemote);
when(session.isOpen()).thenReturn(true);
rtmpAdaptor.setSession(session);

IceCandidate iceCandidate = new IceCandidate(RandomStringUtils.randomAlphanumeric(6), 5, RandomStringUtils.randomAlphanumeric(6));
rtmpAdaptor.onIceCandidate(iceCandidate);


verify(webSocketHandler).sendTakeCandidateMessage(iceCandidate.sdpMLineIndex, iceCandidate.sdpMid, iceCandidate.sdp, streamId, session);

JSONObject jsonObject = new JSONObject();
jsonObject.put(WebSocketConstants.COMMAND, WebSocketConstants.TAKE_CANDIDATE_COMMAND);
jsonObject.put(WebSocketConstants.CANDIDATE_LABEL, iceCandidate.sdpMLineIndex);
jsonObject.put(WebSocketConstants.CANDIDATE_ID, iceCandidate.sdpMid);
jsonObject.put(WebSocketConstants.CANDIDATE_SDP, iceCandidate.sdp);
jsonObject.put(WebSocketConstants.STREAM_ID, streamId);

try {
verify(basicRemote).sendText(jsonObject.toJSONString());
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}


}

@Test
public void testStartandStop() {

Expand All @@ -78,27 +135,59 @@ public ApplicationContext getAppContext() {
String streamId = "stramId" + (int)(Math.random()*10000);
rtmpAdaptor.setStreamId(streamId);
Session session = mock(Session.class);
RemoteEndpoint.Basic basicRemote = mock(RemoteEndpoint.Basic .class);
when(session.getBasicRemote()).thenReturn(basicRemote);
when(session.isOpen()).thenReturn(true);
rtmpAdaptor.setSession(session);

doReturn(mock(PeerConnectionFactory.class)).when(rtmpAdaptor).createPeerConnectionFactory();
PeerConnectionFactory peerConnectionFactory = mock(PeerConnectionFactory.class);

doReturn(peerConnectionFactory).when(rtmpAdaptor).createPeerConnectionFactory();

rtmpAdaptor.start();

Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() ->
rtmpAdaptor.isStarted()
);
rtmpAdaptor.isStarted()
);

verify(webSocketHandler).sendStartMessage(streamId, session);

SessionDescription sdp = new SessionDescription(Type.OFFER, RandomStringUtils.randomAlphanumeric(6));

rtmpAdaptor.onCreateSuccess(sdp);

verify(webSocketHandler).sendSDPConfiguration(sdp.description, "offer", streamId, session);
JSONObject jsonResponseObject = new JSONObject();
jsonResponseObject.put(WebSocketConstants.COMMAND, WebSocketConstants.TAKE_CONFIGURATION_COMMAND);
jsonResponseObject.put(WebSocketConstants.SDP, sdp.description);
jsonResponseObject.put(WebSocketConstants.TYPE, "offer");
jsonResponseObject.put(WebSocketConstants.STREAM_ID, streamId);
try {
verify(basicRemote).sendText(jsonResponseObject.toJSONString());
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}

rtmpAdaptor.stop();

Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() ->
rtmpAdaptor.getSignallingExecutor().isShutdown()
);
rtmpAdaptor.getSignallingExecutor().isShutdown()
);

verify(webSocketHandler).sendPublishFinishedMessage(streamId, session);

JSONObject jsonObj = new JSONObject();
jsonObj.put(WebSocketConstants.COMMAND, WebSocketConstants.NOTIFICATION_COMMAND);
jsonObj.put(WebSocketConstants.DEFINITION, WebSocketConstants.PUBLISH_FINISHED);
jsonObj.put(WebSocketConstants.STREAM_ID, streamId);
try {
verify(basicRemote).sendText(jsonObj.toJSONString());
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}

}


Expand Down
Loading

0 comments on commit dbcc1b2

Please sign in to comment.