Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/issue 38 restart period #67

Merged
merged 5 commits into from
May 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>io.antmedia</groupId>
<artifactId>parent</artifactId>
<version>1.3.5.1</version>
<version>1.3.6.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>ant-media-server</artifactId>
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/antmedia/AntMediaApplicationAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public boolean appStart(IScope app) {
@Override
public void execute(ISchedulingService service) throws CloneNotSupportedException {
streamFetcherManager = new StreamFetcherManager(AntMediaApplicationAdapter.this, dataStore,app);
streamFetcherManager.setRestartStreamFetcherPeriod(appSettings.getRestartStreamFetcherPeriod());
List<Broadcast> streams = getDataStore().getExternalStreamsList();
streamFetcherManager.startStreams(streams);

Expand Down
7 changes: 6 additions & 1 deletion src/main/java/io/antmedia/streamsource/StreamFetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ public void run() {

}
catch (OutOfMemoryError e) {
logger.info("---OutOfMemoryError in thread---");
e.printStackTrace();
exceptionInThread = true;
}
Expand Down Expand Up @@ -298,6 +297,8 @@ public void run() {
thread.start();
}

logger.info("Leaving thread");

}

}
Expand Down Expand Up @@ -457,6 +458,10 @@ public boolean isRestartStream() {
public void setRestartStream(boolean restartStream) {
this.restartStream = restartStream;
}

public void setStream(Broadcast stream) {
this.stream = stream;
}


}
105 changes: 87 additions & 18 deletions src/main/java/io/antmedia/streamsource/StreamFetcherManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ public class StreamFetcherManager {

private ConcurrentLinkedQueue<StreamFetcher> streamFetcherList = new ConcurrentLinkedQueue<>();

private int streamCheckerInterval = 10000;
/**
* Time period in milli seconds for checking stream fetchers status, restart issues etc.
*/
private int streamCheckerIntervalMs = 10000;

private ISchedulingService schedulingService;

Expand All @@ -45,21 +48,63 @@ public class StreamFetcherManager {

protected AtomicBoolean isJobRunning = new AtomicBoolean(false);

public StreamFetcherManager(ISchedulingService schedulingService, IDataStore datastore,IScope scope) {
public static class StreamFetcherFactory {
public StreamFetcher make(Broadcast stream, IScope scope) throws Exception {
return new StreamFetcher(stream, scope);
}
}

private boolean restartStreamAutomatically = true;

public StreamFetcherFactory streamFetcherFactory;

/**
* Time period in seconds for restarting stream fetchers
*/
private int restartStreamFetcherPeriodSeconds;

public StreamFetcherManager(ISchedulingService schedulingService, IDataStore datastore,IScope scope) {
this(schedulingService, datastore, scope, null);
}



public StreamFetcherManager(ISchedulingService schedulingService, IDataStore datastore,IScope scope, StreamFetcherFactory streamFetcherFactory) {
this.schedulingService = schedulingService;
this.datastore = datastore;
this.scope=scope;
if (streamFetcherFactory != null) {
this.streamFetcherFactory = streamFetcherFactory;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to make it more readable, this might be an option:
this.streamFetcherFactory = streamFetcherFactory;
if(this.streamFetcherFactory == null){
this.streamFetcherFactory = new StreamFetcherFactory();}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check this out
e3f6052

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mekya add this also this.streamFetcherFactory = streamFetcherFactory;

}
else {
this.streamFetcherFactory = new StreamFetcherFactory();
}

}

public int getStreamCheckerInterval() {
return streamCheckerInterval;
return streamCheckerIntervalMs;
}


/**
* Set stream checker interval, this value is used in periodically checking
* the status of the stream fetchers
*
* @param streamCheckerInterval, time period of the stream fetcher check interval in milliseconds
*/
public void setStreamCheckerInterval(int streamCheckerInterval) {
this.streamCheckerInterval = streamCheckerInterval;
this.streamCheckerIntervalMs = streamCheckerInterval;
}

/**
* Set stream fetcher restart period, this value is used in periodically stopping and starting
* stream fetchers. If this value is zero it will not restart stream fetchers
*
* @param restartStreamFetcherPeriod, time period of the stream fetcher restart period in seconds
*/
public void setRestartStreamFetcherPeriod(int restartStreamFetcherPeriod) {
this.restartStreamFetcherPeriodSeconds = restartStreamFetcherPeriod;
}


Expand All @@ -68,10 +113,12 @@ public Result startStreaming(Broadcast broadcast) {
Result result=new Result(false);

try {
StreamFetcher streamScheduler = new StreamFetcher(broadcast,scope);
StreamFetcher streamScheduler = streamFetcherFactory.make(broadcast, scope);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of making StreamFetcherFactory a static class, make "make" method a static method, so there will be no need to create an instance and keep as variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making make method static is ok. On the other hand, in testing we give a mock StreamFetcherFactory instance as a parameter to create a test case. How can we do that not using an instance?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamFetcherFactory class has only "make" method. You dont need an instance to call it if you make the "make" method static.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Please just take a look at here
https://github.com/ant-media/Ant-Media-Server/blob/dev/src/test/java/io/antmedia/test/StreamFetcherUnitTest.java#L258

We give a different mock StreamFactory class. If we call StreamFactory.make static function directly how we give a separate mock in the test case above? Maybe we need to change some other things in the code as well.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will not give a mock and you will not pass as parameter.

streamScheduler.setRestartStream(restartStreamAutomatically);
streamScheduler.startStream();

if(broadcast.getType().equals(AntMediaApplicationAdapter.IP_CAMERA)) {
String broadcastType = broadcast.getType();
if(broadcastType != null && broadcastType.equals(AntMediaApplicationAdapter.IP_CAMERA)) {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
Expand All @@ -85,6 +132,9 @@ public Result startStreaming(Broadcast broadcast) {
result.setSuccess(true);
}
streamFetcherList.add(streamScheduler);
if (streamFetcherScheduleJobName == null) {
scheduleStreamFetcherJob();
}
}
catch (Exception e) {
e.printStackTrace();
Expand Down Expand Up @@ -113,11 +163,17 @@ public void startStreams(List<Broadcast> streams) {
startStreaming(streams.get(i));
}

scheduleStreamFetcherJob();
}

private void scheduleStreamFetcherJob() {
if (streamFetcherScheduleJobName != null) {
schedulingService.removeScheduledJob(streamFetcherScheduleJobName);
}

streamFetcherScheduleJobName = schedulingService.addScheduledJobAfterDelay(streamCheckerInterval, new IScheduledJob() {
streamFetcherScheduleJobName = schedulingService.addScheduledJobAfterDelay(streamCheckerIntervalMs, new IScheduledJob() {

private int lastRestartCount = 0;

@Override
public void execute(ISchedulingService service) throws CloneNotSupportedException {
Expand All @@ -127,10 +183,19 @@ public void execute(ISchedulingService service) throws CloneNotSupportedExceptio
streamCheckerCount++;

logger.warn("StreamFetcher Check Count :" + streamCheckerCount);

int countToRestart = 0;
if (restartStreamFetcherPeriodSeconds > 0)
{
int streamCheckIntervalSec = streamCheckerIntervalMs / 1000;
countToRestart = (streamCheckerCount * streamCheckIntervalSec) / restartStreamFetcherPeriodSeconds;
}


if (streamCheckerCount % 180 == 0) {
if (countToRestart > lastRestartCount) {

logger.info("Restarting streams");
lastRestartCount = countToRestart;
logger.info("This is {} times that restarting streams", lastRestartCount);
for (StreamFetcher streamScheduler : streamFetcherList) {

if (streamScheduler.isStreamAlive())
Expand All @@ -156,19 +221,11 @@ public void execute(ISchedulingService service) throws CloneNotSupportedExceptio
AntMediaApplicationAdapter.BROADCAST_STATUS_FINISHED);
}
}
/*
if (!streamScheduler.isThreadActive()) {
streamScheduler.startStream();
}
else {
logger.info("there is an active thread for {} so that new thread is not started", stream.getStreamId());
}
*/
}
}
}
}
}, 5000);
}, streamCheckerIntervalMs);

logger.info("StreamFetcherSchedule job name {}", streamFetcherScheduleJobName);
}
Expand All @@ -189,4 +246,16 @@ public void setStreamFetcherList(ConcurrentLinkedQueue<StreamFetcher> streamFetc
this.streamFetcherList = streamFetcherList;
}



public boolean isRestartStreamAutomatically() {
return restartStreamAutomatically;
}



public void setRestartStreamAutomatically(boolean restartStreamAutomatically) {
this.restartStreamAutomatically = restartStreamAutomatically;
}

}
100 changes: 94 additions & 6 deletions src/test/java/io/antmedia/test/StreamFetcherUnitTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;

import static org.mockito.Mockito.*;
import org.mongodb.morphia.Datastore;
import org.red5.server.scheduling.QuartzSchedulingService;
import org.red5.server.scope.WebScope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -43,7 +47,10 @@
import io.antmedia.integration.MuxingTest;
import io.antmedia.integration.RestServiceTest;
import io.antmedia.muxer.MuxAdaptor;
import io.antmedia.rest.model.Result;
import io.antmedia.streamsource.StreamFetcher;
import io.antmedia.streamsource.StreamFetcherManager;
import io.antmedia.streamsource.StreamFetcherManager.StreamFetcherFactory;

@ContextConfiguration(locations = { "test.xml" })
public class StreamFetcherUnitTest extends AbstractJUnit4SpringContextTests {
Expand Down Expand Up @@ -104,7 +111,6 @@ public void before() {
//reset values in the bean
getAppSettings().setMp4MuxingEnabled(defaultSettings.isMp4MuxingEnabled());
getAppSettings().setHlsMuxingEnabled(defaultSettings.isHlsMuxingEnabled());
getAppSettings().setAddDateTimeToMp4FileName(false);

getAppSettings().setMp4MuxingEnabled(true);
getAppSettings().setAddDateTimeToMp4FileName(defaultSettings.isAddDateTimeToMp4FileName());
Expand Down Expand Up @@ -148,7 +154,10 @@ public void testBugUpdateStreamFetcherStatus() {
//set mapdb datastore to stream fetcher because in memory datastore just have references and updating broadcst
// object updates the reference in inmemorydatastore
app.getStreamFetcherManager().setDatastore(dataStore);


app.getStreamFetcherManager().setRestartStreamAutomatically(false);
app.getStreamFetcherManager().setStreamCheckerInterval(5000);

app.getStreamFetcherManager().getStreamFetcherList().clear();

assertEquals(0, app.getStreamFetcherManager().getStreamFetcherList().size());
Expand Down Expand Up @@ -216,6 +225,85 @@ public void testBugUpdateStreamFetcherStatus() {
logger.info("leaving testBugUpdateStreamFetcherStatus");

}


@Test
public void testRestartPeriodStreamFetcher() {

try {
//Create Stream Fetcher Manager
QuartzSchedulingService scheduler = (QuartzSchedulingService) applicationContext.getBean(QuartzSchedulingService.BEAN_NAME);

InMemoryDataStore memoryDataStore = new InMemoryDataStore("testdb");



//Create a mock StreamFetcher and add it to StreamFetcherManager
StreamFetcher streamFetcher = Mockito.mock(StreamFetcher.class);
Broadcast stream = Mockito.mock(Broadcast.class);

stream.setStreamId(String.valueOf((Math.random() * 100000)));

stream.setStreamUrl("anyurl");
streamFetcher.setStream(stream);
when(streamFetcher.getStream()).thenReturn(stream);
when(streamFetcher.isStreamAlive()).thenReturn(true);
when(streamFetcher.getCameraError()).thenReturn(new Result(true));

StreamFetcherFactory factory = mock(StreamFetcherFactory.class);
when(factory.make(stream, appScope)).thenReturn(streamFetcher);

StreamFetcherManager fetcherManager = new StreamFetcherManager(scheduler, memoryDataStore, appScope, factory);

//set checker interval to 3 seconds
fetcherManager.setStreamCheckerInterval(4000);

//set restart period to 5 seconds
fetcherManager.setRestartStreamFetcherPeriod(5);

//Start stream fetcher
Result result = fetcherManager.startStreaming(stream);
assertTrue(result.isSuccess());

//wait 10-12 seconds
Thread.sleep(13000);

//check that stream fetcher stop and start stream is called 4 times
verify(streamFetcher, times(2)).stopStream();

//it is +1 because it is called at first start
verify(streamFetcher, times(3)).startStream();
verify(streamFetcher, times(3)).startStream();

//set restart period to 0 seconds
fetcherManager.setRestartStreamFetcherPeriod(0);

//wait 10-12 seconds
Thread.sleep(13000);

//check that stream fetcher stop and start stream is not called
verify(streamFetcher, times(2)).stopStream();
verify(streamFetcher, times(3)).startStream();

//set restart period to 0 seconds
fetcherManager.setRestartStreamFetcherPeriod(5);

//wait 10-12 seconds
Thread.sleep(13000);

//check that stream fetcher stop and start stream is not called
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a separate test for each verifying a feature. Ex: each verify after Thread.sleeps look like testing a different thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this is a kind of scenario testing. There is periodic task in StreamFetcher, we change some metrics on the fly and verify periodic task is working properly. Change again some metrics and verify test case again. This is real use case scenario. We need to sleep sometime to make sure the periodic task do its job.

verify(streamFetcher, atLeast(4)).stopStream();
verify(streamFetcher, atLeast(5)).startStream();

fetcherManager.setRestartStreamFetcherPeriod(0);

} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}


}

@Test
public void testThreadStopStart() {
Expand Down Expand Up @@ -698,9 +786,8 @@ public void testHLSFlagResult() {
try {
String textInFile;

startCameraEmulator();

Broadcast newCam = new Broadcast("streamSource", "127.0.0.1:8080", "admin", "admin", "rtsp://127.0.0.1:6554/test.flv",
Broadcast newCam = new Broadcast("streamSource", "127.0.0.1:8080", "admin", "admin", "src/test/resources/test.ts",
AntMediaApplicationAdapter.STREAM_SOURCE);

assertNotNull(newCam.getStreamUrl());
Expand All @@ -710,6 +797,9 @@ public void testHLSFlagResult() {
assertNotNull(newCam.getStreamId());

StreamFetcher fetcher = new StreamFetcher(newCam, appScope);


fetcher.setRestartStream(false);

assertFalse(fetcher.isThreadActive());
assertFalse(fetcher.isStreamAlive());
Expand Down Expand Up @@ -758,8 +848,6 @@ public void testHLSFlagResult() {

assertFalse(textInFile.contains("EXT-X-ENDLIST"));

stopCameraEmulator();

getInstance().getDataStore().delete(id);
}
catch (Exception e) {
Expand Down
Loading