Skip to content

Commit

Permalink
- cancel the message sending task when it takes a long time to compl…
Browse files Browse the repository at this point in the history
…ete. The thread

   may have blocked when the rtmp connection was closed.
  • Loading branch information
ritzalam committed Nov 18, 2015
1 parent de71d17 commit 22b3a4e
Showing 1 changed file with 59 additions and 6 deletions.
Expand Up @@ -22,10 +22,15 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.red5.logging.Red5LoggerFactory;
import org.red5.server.api.IConnection;
import org.red5.server.api.scope.IScope;
Expand All @@ -36,6 +41,7 @@
import org.red5.server.so.SharedObjectService;
import org.red5.server.util.ScopeUtils;
import org.slf4j.Logger;

import com.google.gson.Gson;

public class ConnectionInvokerService {
Expand All @@ -44,12 +50,14 @@ public class ConnectionInvokerService {
private final String CONN = "RED5-";
private static final int NTHREADS = 1;
private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);
private static final Executor runExec = Executors.newFixedThreadPool(NTHREADS);
private static final ExecutorService runExec = Executors.newFixedThreadPool(3);

private BlockingQueue<ClientMessage> messages;
private volatile boolean sendMessages = false;
private IScope bbbAppScope;

private final long SEND_TIMEOUT = 5000000000L; // 5s

public ConnectionInvokerService() {
messages = new LinkedBlockingQueue<ClientMessage>();
}
Expand Down Expand Up @@ -79,6 +87,7 @@ public void run() {

public void stop() {
sendMessages = false;
runExec.shutdown();
}

public void sendMessage(final ClientMessage message) {
Expand Down Expand Up @@ -157,7 +166,7 @@ private void sendSharedObjectMessage(SharedObjectClientMessage msg) {
}
}
}

private void sendDirectMessage(final DirectClientMessage msg) {
if (log.isTraceEnabled()) {
Gson gson = new Gson();
Expand All @@ -177,11 +186,13 @@ public void run() {
List<Object> params = new ArrayList<Object>();
params.add(msg.getMessageName());
params.add(msg.getMessage());

if (log.isTraceEnabled()) {
Gson gson = new Gson();
String json = gson.toJson(msg.getMessage());
log.trace("Send direct message: " + msg.getMessageName() + " msg=" + json);
}

ServiceUtils.invokeOnConnection(conn, "onMessageFromServer", params.toArray());
}
} else {
Expand All @@ -191,9 +202,30 @@ public void run() {
}
}
};
runExec.execute(sender);
}

/**
* We need to add a way to cancel sending when the thread is blocked.
* Red5 uses a semaphore to guard the rtmp connection and we've seen
* instances where our thread is blocked preventing us from sending messages
* to other connections. (ralam nov 19, 2015)
*/
long endNanos = System.nanoTime() + SEND_TIMEOUT;
Future<?> f = runExec.submit(sender);
try {
// Only wait for the remaining time budget
long timeLeft = endNanos - System.nanoTime();
f.get(timeLeft, TimeUnit.NANOSECONDS);
} catch (ExecutionException e) {
log.warn("ExecutionException while sending direct message on connection[" + sessionId + "]");
} catch (InterruptedException e) {
log.warn("Interrupted exception while sending direct message on connection[" + sessionId + "]");
Thread.currentThread().interrupt();
} catch (TimeoutException e) {
log.warn("Timeout exception while sending direct message on connection[" + sessionId + "]");
f.cancel(true);
}
}

private void sendBroadcastMessage(final BroadcastClientMessage msg) {
if (log.isTraceEnabled()) {
Gson gson = new Gson();
Expand All @@ -217,7 +249,28 @@ public void run() {
}
}
};
runExec.execute(sender);

/**
* We need to add a way to cancel sending when the thread is blocked.
* Red5 uses a semaphore to guard the rtmp connection and we've seen
* instances where our thread is blocked preventing us from sending messages
* to other connections. (ralam nov 19, 2015)
*/
long endNanos = System.nanoTime() + SEND_TIMEOUT;
Future<?> f = runExec.submit(sender);
try {
// Only wait for the remaining time budget
long timeLeft = endNanos - System.nanoTime();
f.get(timeLeft, TimeUnit.NANOSECONDS);
} catch (ExecutionException e) {
log.warn("ExecutionException while sending broadcast message[" + msg.getMessageName() + "]");
} catch (InterruptedException e) {
log.warn("Interrupted exception while sending direct message[" + msg.getMessageName() + "]");
Thread.currentThread().interrupt();
} catch (TimeoutException e) {
log.warn("Timeout exception while sending direct message[" + msg.getMessageName() + "]");
f.cancel(true);
}
}

private IConnection getConnection(IScope scope, String userID) {
Expand Down

0 comments on commit 22b3a4e

Please sign in to comment.