Skip to content

Commit

Permalink
- change poll config for jedis
Browse files Browse the repository at this point in the history
 - attempt to reconnect redis pubsub subscriber if disconnected
  • Loading branch information
ritzalam committed Aug 23, 2015
1 parent 0e2e79f commit a63a211
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 10 deletions.
Expand Up @@ -29,9 +29,21 @@ public class RedisDispatcher implements Recorder {
private JedisPool redisPool;

public RedisDispatcher(String host, int port, String password) {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(32);
config.setMaxIdle(8);
config.setMinIdle(1);
config.setTestOnBorrow(true);
config.setTestOnReturn(true);
config.setTestWhileIdle(true);
config.setNumTestsPerEvictionRun(12);
config.setMaxWaitMillis(5000);
config.setTimeBetweenEvictionRunsMillis(60000);
config.setBlockWhenExhausted(true);

// Set the name of this client to be able to distinguish when doing
// CLIENT LIST on redis-cli
redisPool = new JedisPool(new GenericObjectPoolConfig(), host, port, Protocol.DEFAULT_TIMEOUT, null,
redisPool = new JedisPool(config, host, port, Protocol.DEFAULT_TIMEOUT, null,
Protocol.DEFAULT_DATABASE, "BbbAppsAkkaRec");
}

Expand Down
Expand Up @@ -169,6 +169,7 @@ class BigBlueButtonActor(val system: ActorSystem, recorderApp: RecorderApplicati
}

private def handlePubSubPingMessage(msg: PubSubPing): Unit = {
//log.info("PubSubPing from [" + msg.system + "]")
outGW.send(new PubSubPong(msg.system, msg.timestamp))
}

Expand Down
1 change: 1 addition & 0 deletions bbb-voice/.gitignore 100644 → 100755
Expand Up @@ -2,3 +2,4 @@
.classpath
.settings
build
/bin/
Expand Up @@ -102,13 +102,14 @@ public void run() {
}

private void processKeepAliveMessage(KeepAliveMessage msg) {
//log.info("BBB Apps Red5 pubsub pong!" + msg.system);
lastKeepAliveMessage = System.currentTimeMillis();
}

private void processCheckIsAliveTimer(CheckIsAliveTimer msg) {
Long now = System.currentTimeMillis();

if (lastKeepAliveMessage != 0 && (now - lastKeepAliveMessage > 10000)) {
if (lastKeepAliveMessage != 0 && (now - lastKeepAliveMessage > 30000)) {
log.error("BBB Apps Red5 pubsub error!");
service.sendMessage(new DisconnectAllMessage());
}
Expand Down
Expand Up @@ -2,11 +2,14 @@

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import org.bigbluebutton.common.messages.MessagingConstants;
import org.red5.logging.Red5LoggerFactory;
import org.slf4j.Logger;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.exceptions.JedisConnectionException;

public class MessageReceiver {
private static Logger log = Red5LoggerFactory.getLogger(MessageReceiver.class, "bigbluebutton");
Expand Down Expand Up @@ -37,8 +40,14 @@ public void start() {
Runnable messageReceiver = new Runnable() {
public void run() {
if (receiveMessage) {
jedis.psubscribe(new PubSubListener(),
MessagingConstants.FROM_BBB_APPS_PATTERN);
try {
jedis.psubscribe(new PubSubListener(),
MessagingConstants.FROM_BBB_APPS_PATTERN);
} catch(JedisConnectionException ex) {
log.warn("Exception on Jedis connection. Resubscribing to pubsub.");
start();
}

}
}
};
Expand Down
Expand Up @@ -27,12 +27,25 @@ public class MessageSender {

public void stop() {
sendMessage = false;
redisPool.destroy();
}

public void start() {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(32);
config.setMaxIdle(8);
config.setMinIdle(1);
config.setTestOnBorrow(true);
config.setTestOnReturn(true);
config.setTestWhileIdle(true);
config.setNumTestsPerEvictionRun(12);
config.setMaxWaitMillis(5000);
config.setTimeBetweenEvictionRunsMillis(60000);
config.setBlockWhenExhausted(true);

// Set the name of this client to be able to distinguish when doing
// CLIENT LIST on redis-cli
redisPool = new JedisPool(new GenericObjectPoolConfig(), host, port, Protocol.DEFAULT_TIMEOUT, null,
redisPool = new JedisPool(config, host, port, Protocol.DEFAULT_TIMEOUT, null,
Protocol.DEFAULT_DATABASE, "BbbRed5AppsPub");

log.info("Redis message publisher starting!");
Expand Down
Expand Up @@ -121,11 +121,11 @@ public void run() {
private void processMessage(final KeepAliveMessage msg) {
Runnable task = new Runnable() {
public void run() {
if (msg instanceof KeepAlivePing) {
processPing((KeepAlivePing) msg);
} else if (msg instanceof KeepAlivePong) {
processPong((KeepAlivePong) msg);
}
if (msg instanceof KeepAlivePing) {
processPing((KeepAlivePing) msg);
} else if (msg instanceof KeepAlivePong) {
processPong((KeepAlivePong) msg);
}
}
};

Expand Down

0 comments on commit a63a211

Please sign in to comment.