Skip to content

Commit

Permalink
- start implementing round-trip pubsub monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
ritzalam committed Aug 18, 2015
1 parent 2e7238a commit 8be746b
Show file tree
Hide file tree
Showing 17 changed files with 105 additions and 201 deletions.
Expand Up @@ -23,7 +23,7 @@ class RedisPublisher(val system: ActorSystem) extends SystemConfiguration {

// publish after 2 seconds every 2 or 5 seconds
//system.scheduler.schedule(2 seconds, 2 seconds)(redis.publish("time", System.currentTimeMillis()))
// system.scheduler.schedule(2 seconds, 5 seconds)(redis.publish("bigbluebutton:to-bbb-apps:users", "pattern value"))
//system.scheduler.schedule(10 seconds, 5 seconds)(redis.publish("bigbluebutton:to-bbb-apps:users", "pattern value"))

def publish(channel: String, data: String) {
println("PUBLISH TO [" + channel + "]: \n [" + data + "]")
Expand Down
8 changes: 4 additions & 4 deletions bbb-common-message/build.sbt
Expand Up @@ -4,7 +4,7 @@ name := "bbb-common-message"

organization := "org.bigbluebutton"

version := "0.0.12"
version := "0.0.13-SNAPSHOT"

// We want to have our jar files in lib_managed dir.
// This way we'll have the right path when we import
Expand Down Expand Up @@ -48,20 +48,20 @@ autoScalaLibrary := false
* publish to the local maven repo using "sbt publish"
*/
// Uncomment this to publish to local maven repo while commenting out the nexus repo
//publishTo := Some(Resolver.file("file", new File(Path.userHome.absolutePath+"/.m2/repository")))
publishTo := Some(Resolver.file("file", new File(Path.userHome.absolutePath+"/.m2/repository")))


// Comment this out when publishing to local maven repo using SNAPSHOT version.
// To push to sonatype "sbt publishSigned"

/*
publishTo := {
val nexus = "https://oss.sonatype.org/"
if (isSnapshot.value)
Some("snapshots" at nexus + "content/repositories/snapshots")
else
Some("releases" at nexus + "service/local/staging/deploy/maven2")
}

*/

// Enables publishing to maven repo
publishMavenStyle := true
Expand Down
@@ -0,0 +1,11 @@
package org.bigbluebutton.common.messages;

import org.bigbluebutton.common.messages.payload.PubSubPingMessagePayload;

public class PubSubPingMessage implements IBigBlueButtonMessage {

public static final String PUBSUB_PING = "BbbPubSubPingMessage";

public MessageHeader header;
public PubSubPingMessagePayload payload;
}
@@ -0,0 +1,11 @@
package org.bigbluebutton.common.messages;

import org.bigbluebutton.common.messages.payload.PubSubPingMessagePayload;

public class PubSubPongMessage implements IBigBlueButtonMessage {

public static final String PUBSUB_PONG = "BbbPubSubPongMessage";

public MessageHeader header;
public PubSubPingMessagePayload payload;
}
@@ -1,5 +1,7 @@
package org.bigbluebutton.common.messages;

import org.bigbluebutton.common.messages.payload.StartCustomPollRequestMessagePayload;

public class StartCustomPollRequestMessage implements IBigBlueButtonMessage {

public static final String START_CUSTOM_POLL_REQUEST = "start_custom_poll_request_message";
Expand Down
Expand Up @@ -4,8 +4,6 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
Expand Down
@@ -0,0 +1,6 @@
package org.bigbluebutton.common.messages.payload;

public class PubSubPingMessagePayload {
public String system;
public Long timestamp;
}
@@ -1,4 +1,4 @@
package org.bigbluebutton.common.messages;
package org.bigbluebutton.common.messages.payload;

import java.util.ArrayList;

Expand Down
Expand Up @@ -2,6 +2,7 @@

import java.util.ArrayList;

import org.bigbluebutton.common.messages.payload.StartCustomPollRequestMessagePayload;
import org.junit.*;

import com.google.gson.Gson;
Expand Down
7 changes: 3 additions & 4 deletions bigbluebutton-apps/build.gradle
Expand Up @@ -88,7 +88,6 @@ dependencies {
providedCompile 'org.slf4j:jul-to-slf4j:1.7.9@jar'
providedCompile 'org.slf4j:slf4j-api:1.7.9@jar'

compile "redis.clients:jedis:2.1.0"
compile "org.codehaus.jackson:jackson-core-asl:$jacksonVersion"
compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
compile "javax.servlet:com.springsource.javax.servlet.jsp.jstl:1.2.0"
Expand All @@ -106,13 +105,13 @@ dependencies {
compile 'org.easymock:easymock:2.4@jar'

//redis
//compile 'redis.clients:jedis:2.0.0'
providedCompile 'commons-pool:commons-pool:1.5.6'
compile "redis.clients:jedis:2.7.2"
compile 'org.apache.commons:commons-pool2:2.3'

compile 'com.google.code.gson:gson:1.7.1'
providedCompile 'org.apache.commons:commons-lang3:3.2'

compile 'org.bigbluebutton:bbb-common-message:0.0.12'
compile 'org.bigbluebutton:bbb-common-message:0.0.13-SNAPSHOT'
}

test {
Expand Down
Expand Up @@ -7,11 +7,18 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.bigbluebutton.common.messages.MessageHeader;
import org.bigbluebutton.common.messages.MessagingConstants;
import org.bigbluebutton.common.messages.PubSubPingMessage;
import org.bigbluebutton.common.messages.payload.PubSubPingMessagePayload;
import org.bigbluebutton.red5.client.messaging.ConnectionInvokerService;
import org.bigbluebutton.red5.client.messaging.DisconnectAllMessage;
import org.bigbluebutton.red5.pubsub.redis.MessageSender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.gson.Gson;

public class BbbAppsIsAliveMonitorService {
private static Logger log = LoggerFactory.getLogger(BbbAppsIsAliveMonitorService.class);

Expand All @@ -27,6 +34,12 @@ public class BbbAppsIsAliveMonitorService {
private ConnectionInvokerService service;
private Long lastKeepAliveMessage = 0L;

private MessageSender sender;

public void setMessageSender(MessageSender sender) {
this.sender = sender;
}

public void setConnectionInvokerService(ConnectionInvokerService s) {
this.service = s;
}
Expand Down Expand Up @@ -99,6 +112,20 @@ class KeepAliveTask implements Runnable {
public void run() {
CheckIsAliveTimer ping = new CheckIsAliveTimer();
queueMessage(ping);

PubSubPingMessage msg = new PubSubPingMessage();
MessageHeader header = new MessageHeader();
header.name = PubSubPingMessage.PUBSUB_PING;
header.timestamp = System.nanoTime();
header.replyTo = "BbbRed5";
header.version = "0.0.1";
PubSubPingMessagePayload payload = new PubSubPingMessagePayload();
payload.system = "BbbAppsRed5";
payload.timestamp = System.currentTimeMillis();
msg.header = header;
msg.payload = payload;
Gson gson = new Gson();
sender.send(MessagingConstants.TO_SYSTEM_CHANNEL, gson.toJson(msg));
}
}
}

This file was deleted.

Expand Up @@ -2,25 +2,25 @@

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import org.bigbluebutton.common.messages.MessagingConstants;
import org.red5.logging.Red5LoggerFactory;
import org.slf4j.Logger;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;

public class MessageReceiver {
private static Logger log = Red5LoggerFactory.getLogger(MessageReceiver.class, "bigbluebutton");

private ReceivedMessageHandler handler;

private JedisPool redisPool;
private Jedis jedis;
private volatile boolean receiveMessage = false;

private final Executor msgReceiverExec = Executors.newSingleThreadExecutor();

private String host;
private int port;

public void stop() {
receiveMessage = false;
}
Expand All @@ -29,7 +29,8 @@ public void start() {
log.info("Ready to receive messages from Redis pubsub.");
try {
receiveMessage = true;
final Jedis jedis = redisPool.getResource();
jedis = new Jedis(host, port);
jedis.clientSetname("red5-psubscriber");

Runnable messageReceiver = new Runnable() {
public void run() {
Expand All @@ -45,8 +46,12 @@ public void run() {
}
}

public void setRedisPool(JedisPool redisPool){
this.redisPool = redisPool;
public void setHost(String host){
this.host = host;
}

public void setPort(int port) {
this.port = port;
}

public void setMessageHandler(ReceivedMessageHandler handler) {
Expand Down

0 comments on commit 8be746b

Please sign in to comment.