Skip to content

Commit

Permalink
- remove defunct connection when client auto-reconnects.
Browse files Browse the repository at this point in the history
  • Loading branch information
ritzalam committed Oct 20, 2016
1 parent 5b2b0ed commit 5ba3405
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 83 deletions.
Expand Up @@ -175,7 +175,7 @@ object BreakoutRoomsUtil {
}

def joinParams(username: String, userId: String, isBreakout: Boolean, breakoutId: String,
password: String, redirect: Boolean): mutable.Map[String, String] = {
password: String, redirect: Boolean): mutable.Map[String, String] = {
val params = new collection.mutable.HashMap[String, String]
params += "fullName" -> urlEncode(username)
params += "userID" -> urlEncode(userId + "-" + breakoutId.substring(breakoutId.lastIndexOf("-") + 1));
Expand Down
Expand Up @@ -89,7 +89,7 @@ public void conferenceEventJoin(String uniqueId, String confName, int confSize,
System.out.println("User joined voice conference, user=[" + callerIdName + "], conf=[" +
confName + "] callerId=[" + callerId + "]");

VoiceUserJoinedEvent pj = new VoiceUserJoinedEvent(voiceUserId, memberId.toString(), confName, callerId, callerIdName, muted, speaking, "");
VoiceUserJoinedEvent pj = new VoiceUserJoinedEvent(voiceUserId, memberId.toString(), confName, callerId, callerIdName, muted, speaking, "none");
conferenceEventListener.handleConferenceEvent(pj);
}
}
Expand Down
Expand Up @@ -54,7 +54,7 @@ class AppsRedisSubscriberActor(val system: ActorSystem, msgReceiver: RedisMessag
}

def onPMessage(pmessage: PMessage) {
log.debug(s"pattern message received: $pmessage")
// log.debug(s"pattern message received: $pmessage")

val msg = decoder.decodeMessage(pmessage.data)

Expand All @@ -76,4 +76,4 @@ class AppsRedisSubscriberActor(val system: ActorSystem, msgReceiver: RedisMessag
def handleMessage(msg: String) {
log.warning("**** TODO: Handle pubsub messages. ****")
}
}
}
Expand Up @@ -72,7 +72,8 @@ public static UserJoinedVoiceConfMessage fromJson(String message) {
&& payload.has(CALLER_ID_NAME)
&& payload.has(CALLER_ID_NUM)
&& payload.has(MUTED)
&& payload.has(TALKING)) {
&& payload.has(TALKING)
&& payload.has(AVATAR_URL)) {
String voiceConfId = payload.get(VOICE_CONF_ID).getAsString();
String voiceUserId = payload.get(VOICE_USER_ID).getAsString();
String userId = payload.get(USER_ID).getAsString();
Expand Down
Expand Up @@ -106,7 +106,7 @@ private void handleDisconnectAllClientsMessage(DisconnectAllClientsMessage msg)

for (IConnection conn : conns) {
if (conn.isConnected()) {
String connId = (String) conn.getAttribute("INTERNAL_USER_ID");
String connId = (String) conn.getAttribute("USERID");
log.info("Disconnecting client=[{}] from meeting=[{}]", connId, msg.getMeetingId());
conn.close();
}
Expand Down Expand Up @@ -200,7 +200,7 @@ private IConnection getConnection(IScope scope, String userID) {
}
}

return null;
return null;
}

public IScope getScope(String meetingID) {
Expand Down
Expand Up @@ -4,19 +4,13 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.bigbluebutton.app.screenshare.messaging.redis.MessageSender;
//import org.bigbluebutton.app.screenshare.messaging.redis.MessagingConstants;
import org.bigbluebutton.common.messages.AllowUserToShareDesktopRequest;
import org.bigbluebutton.common.messages.MessagingConstants;
import org.red5.logging.Red5LoggerFactory;
import org.red5.server.api.IConnection;
import org.red5.server.api.Red5;
import org.slf4j.Logger;

import com.google.gson.Gson;


public class Red5AppService {
private static Logger log = Red5LoggerFactory.getLogger(Red5AppService.class, "screenshare");

Expand Down Expand Up @@ -45,8 +39,22 @@ public void setUserId(Map<String, Object> msg) {
Set<IConnection> conns = Red5.getConnectionLocal().getScope().getClientConnections();
for (IConnection conn : conns) {
String connUserId = (String) conn.getAttribute("USERID");
if (connUserId != null && connUserId.equals(userId) && !conn.getSessionId().equals(sessionId)) {
String connSessionId = conn.getSessionId();
if (connUserId != null && connUserId.equals(userId) && !connSessionId.equals(sessionId)) {
conn.removeAttribute("USERID");
Map<String, Object> logData = new HashMap<String, Object>();
logData.put("meetingId", meetingId);
logData.put("userId", userId);
logData.put("oldConnId", connSessionId);
logData.put("newConnId", sessionId);
logData.put("event", "removing_defunct_connection");
logData.put("description", "Removing defunct connection BBB Screenshare.");

Gson gson = new Gson();
String logStr = gson.toJson(logData);

log.info("Removing defunct connection: data={}", logStr);

}
}

Expand Down
Expand Up @@ -20,6 +20,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.bigbluebutton.red5.pubsub.MessagePublisher;
Expand Down Expand Up @@ -64,31 +65,57 @@ public boolean appConnect(IConnection conn, Object[] params) {
}

@Override
public boolean roomConnect(IConnection conn, Object[] params) {
log.info("BBB Video roomConnect");
String meetingId = ((String) params[0]).toString();
String userId = ((String) params[1]).toString();
Red5.getConnectionLocal().setAttribute("MEETING_ID", meetingId);
Red5.getConnectionLocal().setAttribute("USERID", userId);
public boolean roomConnect(IConnection connection, Object[] params) {
log.info("BBB Video roomConnect");
String meetingId = ((String) params[0]).toString();
String userId = ((String) params[1]).toString();

Red5.getConnectionLocal().setAttribute("MEETING_ID", meetingId);
Red5.getConnectionLocal().setAttribute("USERID", userId);

String connType = getConnectionType(Red5.getConnectionLocal().getType());
String connId = Red5.getConnectionLocal().getSessionId();

String sessionId = Red5.getConnectionLocal().getSessionId();
/**
* Find if there are any other connections owned by this user. If we find one,
* that means that the connection is old and the user reconnected. Clear the
* userId attribute so that messages would not be sent in the defunct connection.
*/
Set<IConnection> conns = Red5.getConnectionLocal().getScope().getClientConnections();
for (IConnection conn : conns) {
String connUserId = (String) conn.getAttribute("USERID");
String connSessionId = conn.getSessionId();
if (connUserId != null && connUserId.equals(userId) && !connSessionId.equals(sessionId)) {
conn.removeAttribute("USERID");
Map<String, Object> logData = new HashMap<String, Object>();
logData.put("meetingId", meetingId);
logData.put("userId", userId);
logData.put("oldConnId", connSessionId);
logData.put("newConnId", sessionId);
logData.put("event", "removing_defunct_connection");
logData.put("description", "Removing defunct connection BBB Video.");

Gson gson = new Gson();
String logStr = gson.toJson(logData);

log.info("Removing defunct connection: data={}", logStr);

}
}

Map<String, Object> logData = new HashMap<String, Object>();
logData.put("meetingId", meetingId);
logData.put("userId", userId);
logData.put("connType", connType);
logData.put("connId", connId);
logData.put("connId", sessionId);
logData.put("event", "user_joining_bbb_video");
logData.put("description", "User joining BBB Video.");

Gson gson = new Gson();
String logStr = gson.toJson(logData);
String logStr = gson.toJson(logData);

log.info("User joining bbb-video: data={}", logStr);
return super.roomConnect(conn, params);

return super.roomConnect(connection, params);
}

private String getConnectionType(String connType) {
Expand Down
6 changes: 2 additions & 4 deletions bbb.sh
Expand Up @@ -7,7 +7,7 @@ BBB_DIR=$(pwd)

cd $BBB_DIR

DESKSHARE=$BBB_DIR/deskshare
DESKSHARE=$BBB_DIR/bbb-screenshare
VOICE=$BBB_DIR/bbb-voice
VIDEO=$BBB_DIR/bbb-video
APPS=$BBB_DIR/bigbluebutton-apps
Expand All @@ -28,10 +28,8 @@ gradle resolveDeps
gradle clean war deploy

echo "Building deskshare"
cd $DESKSHARE
gradle resolveDeps
cd $DESKSHARE/app
gradle clean war deploy
./deploy.sh

cd $BBB_DIR

Expand Down
Expand Up @@ -42,8 +42,6 @@ public class BigBlueButtonApplication extends MultiThreadedApplicationAdapter {
private ConnectionInvokerService connInvokerService;
private MessagePublisher red5InGW;

private final UserConnectionMapper userConnections = new UserConnectionMapper();

private final String APP = "BBB";
private final String CONN = "RED5-";

Expand Down Expand Up @@ -127,7 +125,36 @@ boolean record = (Boolean)params[4];
}

String userId = internalUserID;
String sessionId = CONN + userId;
String sessionId = Red5.getConnectionLocal().getSessionId();
String connType = getConnectionType(Red5.getConnectionLocal().getType());

/**
* Find if there are any other connections owned by this user. If we find one,
* that means that the connection is old and the user reconnected. Clear the
* userId attribute so that messages would not be sent in the defunct connection.
*/
Set<IConnection> conns = Red5.getConnectionLocal().getScope().getClientConnections();
for (IConnection conn : conns) {
String connUserId = (String) conn.getAttribute("INTERNAL_USER_ID");
String connSessionId = conn.getSessionId();
if (connUserId != null && connUserId.equals(userId) && !connSessionId.equals(sessionId)) {
conn.removeAttribute("INTERNAL_USER_ID");
Map<String, Object> logData = new HashMap<String, Object>();
logData.put("meetingId", room);
logData.put("userId", userId);
logData.put("oldConnId", connSessionId);
logData.put("newConnId", sessionId);
logData.put("event", "removing_defunct_connection");
logData.put("description", "Removing defunct connection BBB Apps.");

Gson gson = new Gson();
String logStr = gson.toJson(logData);

log.info("Removing defunct connection: data={}", logStr);
}
}


BigBlueButtonSession bbbSession = new BigBlueButtonSession(room, internalUserID, username, role,
voiceBridge, record, externalUserID, muted, sessionId);
connection.setAttribute(Constants.SESSION, bbbSession);
Expand All @@ -140,8 +167,7 @@ boolean record = (Boolean)params[4];
red5InGW.initAudioSettings(room, internalUserID, muted);

String meetingId = bbbSession.getRoom();

String connType = getConnectionType(Red5.getConnectionLocal().getType());

String userFullname = bbbSession.getUsername();
String connId = Red5.getConnectionLocal().getSessionId();

Expand All @@ -165,9 +191,7 @@ boolean record = (Boolean)params[4];
String logStr = gson.toJson(logData);

log.info("User joining bbb-apps: data={}", logStr);

userConnections.addUserConnection(userId, connId);


return super.roomConnect(connection, params);

}
Expand All @@ -186,8 +210,7 @@ private String getConnectionType(String connType) {
public void roomDisconnect(IConnection conn) {

String remoteHost = Red5.getConnectionLocal().getRemoteAddress();
int remotePort = Red5.getConnectionLocal().getRemotePort();
String clientId = Red5.getConnectionLocal().getClient().getId();
int remotePort = Red5.getConnectionLocal().getRemotePort();

BigBlueButtonSession bbbSession = (BigBlueButtonSession) Red5.getConnectionLocal().getAttribute(Constants.SESSION);

Expand All @@ -212,15 +235,9 @@ public void roomDisconnect(IConnection conn) {

Gson gson = new Gson();
String logStr = gson.toJson(logData);

boolean removeUser = userConnections.userDisconnected(userId, connId);

if (removeUser) {
log.info("User leaving bbb-apps: data={}", logStr);
red5InGW.userLeft(bbbSession.getRoom(), getBbbSession().getInternalUserID(), sessionId);
} else {
log.info("User not leaving bbb-apps but just disconnected: data={}", logStr);
}

log.info("User leaving bbb-apps: data={}", logStr);
red5InGW.userLeft(bbbSession.getRoom(), getBbbSession().getInternalUserID(), sessionId);

super.roomDisconnect(conn);
}
Expand Down
Expand Up @@ -175,13 +175,13 @@ private void sendDirectMessage(final DirectClientMessage msg) {
log.trace("Handle direct message: " + msg.getMessageName() + " msg=" + json);
}

final String sessionId = CONN + msg.getUserID();
final String userId = msg.getUserID();
Runnable sender = new Runnable() {
public void run() {
IScope meetingScope = getScope(msg.getMeetingID());
if (meetingScope != null) {

IConnection conn = getConnection(meetingScope, sessionId);
IConnection conn = getConnection(meetingScope, userId);
if (conn != null) {
if (conn.isConnected()) {
List<Object> params = new ArrayList<Object>();
Expand All @@ -197,7 +197,7 @@ public void run() {
ServiceUtils.invokeOnConnection(conn, "onMessageFromServer", params.toArray());
}
} else {
log.info("Cannot send message=[" + msg.getMessageName() + "] to [" + sessionId
log.info("Cannot send message=[" + msg.getMessageName() + "] to [" + userId
+ "] as no such session on meeting=[" + msg.getMeetingID() + "]");
}
}
Expand All @@ -217,12 +217,12 @@ public void run() {
long timeLeft = endNanos - System.nanoTime();
f.get(timeLeft, TimeUnit.NANOSECONDS);
} catch (ExecutionException e) {
log.warn("ExecutionException while sending direct message on connection[" + sessionId + "]");
log.warn("ExecutionException while sending direct message on connection[" + userId + "]");
} catch (InterruptedException e) {
log.warn("Interrupted exception while sending direct message on connection[" + sessionId + "]");
log.warn("Interrupted exception while sending direct message on connection[" + userId + "]");
Thread.currentThread().interrupt();
} catch (TimeoutException e) {
log.warn("Timeout exception while sending direct message on connection[" + sessionId + "]");
log.warn("Timeout exception while sending direct message on connection[" + userId + "]");
f.cancel(true);
}
}
Expand Down Expand Up @@ -275,31 +275,15 @@ public void run() {
}

private IConnection getConnection(IScope scope, String userID) {
Set<IConnection> conns = new HashSet<IConnection>();
for (IConnection conn : scope.getClientConnections()) {
String connID = (String) conn.getAttribute("USER_SESSION_ID");
String connID = (String) conn.getAttribute("INTERNAL_USER_ID");
if (connID != null && connID.equals(userID)) {
conns.add(conn);
return conn;
}
}
if (!conns.isEmpty()) {
return getLastConnection(conns);
} else {
log.warn("Failed to get connection for userId = " + userID);
return null;
}
}

private IConnection getLastConnection(Set<IConnection> conns) {
IConnection conn = null;
for (IConnection c : conns) {
if (conn == null) {
conn = c;
} else if ((long) conn.getAttribute("TIMESTAMP") < (long) c.getAttribute("TIMESTAMP")) {
conn = c;
}
}
return conn;
log.warn("Failed to get connection for userId = " + userID);
return null;
}

public IScope getScope(String meetingID) {
Expand Down

0 comments on commit 5ba3405

Please sign in to comment.