Skip to content

Commit

Permalink
Merge pull request #2737 from ritzalam/redis-pubsub-keepalive
Browse files Browse the repository at this point in the history
Redis pubsub keepalive
  • Loading branch information
ffdixon committed Aug 21, 2015
2 parents c70dcfe + 39baece commit 1527c51
Show file tree
Hide file tree
Showing 57 changed files with 643 additions and 905 deletions.
5 changes: 2 additions & 3 deletions akka-bbb-apps/build.sbt
Expand Up @@ -48,12 +48,11 @@ libraryDependencies ++= {
"commons-codec" % "commons-codec" % "1.8",
"joda-time" % "joda-time" % "2.3",
"com.google.code.gson" % "gson" % "1.7.1",
"redis.clients" % "jedis" % "2.1.0",
"redis.clients" % "jedis" % "2.7.2",
"org.apache.commons" % "commons-lang3" % "3.2",
"org.bigbluebutton" % "bbb-common-message" % "0.0.12"
"org.bigbluebutton" % "bbb-common-message" % "0.0.13"
)}


seq(Revolver.settings: _*)

scalariformSettings
Expand Down
Expand Up @@ -2,6 +2,7 @@

import java.util.HashMap;
import java.util.Map;

import org.bigbluebutton.common.messages.CreateMeetingMessage;
import org.bigbluebutton.common.messages.DestroyMeetingMessage;
import org.bigbluebutton.common.messages.EndMeetingMessage;
Expand All @@ -10,6 +11,7 @@
import org.bigbluebutton.common.messages.KeepAliveMessage;
import org.bigbluebutton.common.messages.MessageFromJsonConverter;
import org.bigbluebutton.common.messages.MessagingConstants;
import org.bigbluebutton.common.messages.PubSubPingMessage;
import org.bigbluebutton.common.messages.RegisterUserMessage;
import org.bigbluebutton.common.messages.UserConnectedToGlobalAudio;
import org.bigbluebutton.common.messages.UserDisconnectedFromGlobalAudio;
Expand All @@ -19,6 +21,8 @@
import org.slf4j.LoggerFactory;

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

public class MeetingMessageReceiver implements MessageHandler {
private static final Logger LOG = LoggerFactory.getLogger(MeetingMessageReceiver.class);
Expand Down Expand Up @@ -96,16 +100,32 @@ else if (msg instanceof GetAllMeetingsRequest) {
System.out.println("Failed to decode message: [" + message + "]");
}
} else if (channel.equalsIgnoreCase(MessagingConstants.TO_SYSTEM_CHANNEL)) {
IBigBlueButtonMessage msg = MessageFromJsonConverter.convert(message);

if (msg != null) {
if (msg instanceof KeepAliveMessage) {
KeepAliveMessage emm = (KeepAliveMessage) msg;
bbbGW.isAliveAudit(emm.keepAliveId);
}
} else {
System.out.println("Unknown message: [" + message + "]");
JsonParser parser = new JsonParser();
JsonObject obj = (JsonObject) parser.parse(message);

if (obj.has("header") && obj.has("payload")) {
JsonObject header = (JsonObject) obj.get("header");
if (header.has("name")) {
String messageName = header.get("name").getAsString();
if (PubSubPingMessage.PUBSUB_PING.equals(messageName)){
Gson gson = new Gson();
PubSubPingMessage msg = gson.fromJson(message, PubSubPingMessage.class);
bbbGW.handleBigBlueButtonMessage(msg);
} else {
IBigBlueButtonMessage msg = MessageFromJsonConverter.convert(message);

if (msg != null) {
if (msg instanceof KeepAliveMessage) {
KeepAliveMessage emm = (KeepAliveMessage) msg;
bbbGW.isAliveAudit(emm.keepAliveId);
}
} else {
System.out.println("Unknown message: [" + message + "]");
}
}
}
}

}
}

Expand Down

This file was deleted.

Expand Up @@ -18,20 +18,21 @@
*/
package org.bigbluebutton.core.service.recorder;

import org.apache.commons.pool.impl.GenericObjectPool;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Protocol;

public class RedisDispatcher implements Recorder {

private static final String COLON=":";
private JedisPool redisPool;
private GenericObjectPoolConfigWrapper config;


public RedisDispatcher(String host, int port, String password) {
setupConfig();
redisPool = new JedisPool(config.getConfig(), host, port);
// Set the name of this client to be able to distinguish when doing
// CLIENT LIST on redis-cli
redisPool = new JedisPool(new GenericObjectPoolConfig(), host, port, Protocol.DEFAULT_TIMEOUT, null,
Protocol.DEFAULT_DATABASE, "BbbAppsAkkaRec");
}

@Override
Expand All @@ -42,22 +43,8 @@ public void record(String session, RecordEvent message) {
jedis.hmset("recording" + COLON + session + COLON + msgid, message.toMap());
jedis.rpush("meeting" + COLON + session + COLON + "recordings", msgid.toString());
} finally {
redisPool.returnResource(jedis);
jedis.close();
}
}

private void setupConfig() {
config = new GenericObjectPoolConfigWrapper();
config.setWhenExhaustedAction(GenericObjectPool.WHEN_EXHAUSTED_FAIL);
config.setMaxActive(12);
config.setMaxIdle(6);
config.setMinIdle(1);
config.setTestOnBorrow(true);
config.setTestOnReturn(true);
config.setTestWhileIdle(true);
config.setNumTestsPerEvictionRun(12);
config.setTimeBetweenEvictionRunsMillis(60000);
config.setMaxWait(5000);
}

}
Expand Up @@ -37,6 +37,7 @@ class BigBlueButtonActor(val system: ActorSystem, recorderApp: RecorderApplicati
case msg: CreateMeeting => handleCreateMeeting(msg)
case msg: DestroyMeeting => handleDestroyMeeting(msg)
case msg: KeepAliveMessage => handleKeepAliveMessage(msg)
case msg: PubSubPing => handlePubSubPingMessage(msg)
case msg: ValidateAuthToken => handleValidateAuthToken(msg)
case msg: GetAllMeetingsRequest => handleGetAllMeetingsRequest(msg)
case msg: UserJoinedVoiceConfMessage => handleUserJoinedVoiceConfMessage(msg)
Expand Down Expand Up @@ -167,6 +168,10 @@ class BigBlueButtonActor(val system: ActorSystem, recorderApp: RecorderApplicati
outGW.send(new KeepAliveMessageReply(msg.aliveID))
}

private def handlePubSubPingMessage(msg: PubSubPing): Unit = {
outGW.send(new PubSubPong(msg.system, msg.timestamp))
}

private def handleDestroyMeeting(msg: DestroyMeeting) {
log.info("BBBActor received DestroyMeeting message for meeting id [" + msg.meetingID + "]")
meetings.get(msg.meetingID) match {
Expand Down
Expand Up @@ -15,8 +15,9 @@ import scala.concurrent.duration._
import scala.util.Success
import scala.util.Failure
import org.bigbluebutton.core.service.recorder.RecorderApplication
import org.bigbluebutton.common.messages.IBigBlueButtonMessage;
import org.bigbluebutton.common.messages.IBigBlueButtonMessage
import org.bigbluebutton.common.messages.StartCustomPollRequestMessage
import org.bigbluebutton.common.messages.PubSubPingMessage

class BigBlueButtonInGW(val system: ActorSystem, recorderApp: RecorderApplication, messageSender: MessageSender, voiceEventRecorder: VoiceEventRecorder) extends IBigBlueButtonInGW {
val log = system.log
Expand All @@ -27,6 +28,9 @@ class BigBlueButtonInGW(val system: ActorSystem, recorderApp: RecorderApplicatio
case msg: StartCustomPollRequestMessage => {
bbbActor ! new StartCustomPollRequest(msg.payload.meetingId, msg.payload.requesterId, msg.payload.pollType, msg.payload.answers)
}
case msg: PubSubPingMessage => {
bbbActor ! new PubSubPing(msg.payload.system, msg.payload.timestamp)
}
}
}

Expand Down
Expand Up @@ -26,6 +26,7 @@ import org.bigbluebutton.common.messages.GetCurrentLayoutReplyMessage
import org.bigbluebutton.common.messages.BroadcastLayoutMessage
import org.bigbluebutton.common.messages.LockLayoutMessage
import org.bigbluebutton.core.pubsub.senders.WhiteboardMessageToJsonConverter
import org.bigbluebutton.common.converters.ToJsonEncoder

object MessageSenderActor {
def props(meetingId: String, msgSender: MessageSender): Props =
Expand All @@ -35,6 +36,8 @@ object MessageSenderActor {
class MessageSenderActor(val meetingId: String, val service: MessageSender)
extends Actor with ActorLogging {

val encoder = new ToJsonEncoder()

def receive = {
case msg: GetChatHistoryReply => handleGetChatHistoryReply(msg)
case msg: SendPublicMessageEvent => handleSendPublicMessageEvent(msg)
Expand All @@ -48,6 +51,7 @@ class MessageSenderActor(val meetingId: String, val service: MessageSender)
case msg: MeetingHasEnded => handleMeetingHasEnded(msg)
case msg: MeetingDestroyed => handleMeetingDestroyed(msg)
case msg: KeepAliveMessageReply => handleKeepAliveMessageReply(msg)
case msg: PubSubPong => handlePubSubPong(msg)
case msg: StartRecording => handleStartRecording(msg)
case msg: StopRecording => handleStopRecording(msg)
case msg: GetAllMeetingsReply => handleGetAllMeetingsReply(msg)
Expand Down Expand Up @@ -142,6 +146,11 @@ class MessageSenderActor(val meetingId: String, val service: MessageSender)
service.send(MessagingConstants.FROM_MEETING_CHANNEL, json)
}

private def handlePubSubPong(msg: PubSubPong) {
val json = encoder.encodePubSubPongMessage(msg.system, msg.timestamp)
service.send(MessagingConstants.FROM_SYSTEM_CHANNEL, json)
}

private def handleKeepAliveMessageReply(msg: KeepAliveMessageReply) {
val json = MeetingMessageToJsonConverter.keepAliveMessageReplyToJson(msg)
service.send(MessagingConstants.FROM_SYSTEM_CHANNEL, json)
Expand Down
Expand Up @@ -7,6 +7,8 @@ import org.bigbluebutton.core.MeetingProperties

trait InMessage { val meetingID: String }

case class PubSubPing(system: String, timestamp: Long)

case class IsMeetingActorAliveMessage(meetingId: String)
case class KeepAliveMessage(aliveID: String)
case class CreateMeeting(meetingID: String, mProps: MeetingProperties) extends InMessage
Expand Down
Expand Up @@ -25,6 +25,7 @@ case class MeetingDestroyed(meetingID: String) extends IOutMessage
case class DisconnectAllUsers(meetingID: String) extends IOutMessage
case class DisconnectUser(meetingID: String, userId: String) extends IOutMessage
case class KeepAliveMessageReply(aliveID: String) extends IOutMessage
case class PubSubPong(system: String, timestamp: Long) extends IOutMessage
case object IsAliveMessage extends IOutMessage

// Permissions
Expand Down
Expand Up @@ -9,6 +9,7 @@ import akka.actor.ActorRef
import akka.actor.actorRef2Scala
import org.bigbluebutton.SystemConfiguration
import org.bigbluebutton.core.pubsub.receivers.RedisMessageReceiver
import redis.api.servers.ClientSetname

object AppsRedisSubscriberActor extends SystemConfiguration {

Expand All @@ -28,6 +29,10 @@ class AppsRedisSubscriberActor(msgReceiver: RedisMessageReceiver, redisHost: Str
new InetSocketAddress(redisHost, redisPort),
channels, patterns) {

// Set the name of this client to be able to distinguish when doing
// CLIENT LIST on redis-cli
write(ClientSetname("BbbAppsAkkaSub").encodedRequest)

def onMessage(message: Message) {
log.error(s"SHOULD NOT BE RECEIVING: $message")
}
Expand Down

0 comments on commit 1527c51

Please sign in to comment.