Skip to content

Commit

Permalink
Merge pull request #19611 from gustavotrott/graphql-user-left-on-clos…
Browse files Browse the repository at this point in the history
…e-conn

refactor (akka-apps): Switching from Meteor to GraphQL to determine whether user is connected
  • Loading branch information
gustavotrott committed Feb 12, 2024
2 parents 833296f + f8e008e commit f7cf603
Show file tree
Hide file tree
Showing 16 changed files with 181 additions and 28 deletions.
2 changes: 1 addition & 1 deletion akka-bbb-apps/src/main/scala/org/bigbluebutton/Boot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object Boot extends App with SystemConfiguration {
)

val graphqlActionsActor = system.actorOf(
GraphqlActionsActor.props(system),
GraphqlActionsActor.props(system, eventBus),
"GraphqlActionsActor"
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.bigbluebutton.core.api

import org.bigbluebutton.core.apps.users.UserEstablishedGraphqlConnectionInternalMsgHdlr
import org.bigbluebutton.core.domain.{ BreakoutUser, BreakoutVoiceUser }
import spray.json.JsObject
case class InMessageHeader(name: String)
Expand Down Expand Up @@ -126,6 +127,18 @@ case class SetPresenterInDefaultPodInternalMsg(presenterId: String) extends InMe
*/
case class CaptureSharedNotesReqInternalMsg(breakoutId: String, filename: String) extends InMessage

/**
* Sent by GraphqlActionsActor to inform MeetingActor that user disconnected
* @param userId
*/
case class UserClosedAllGraphqlConnectionsInternalMsg(userId: String) extends InMessage

/**
* Sent by GraphqlActionsActor to inform MeetingActor that user came back from disconnection
* @param userId
*/
case class UserEstablishedGraphqlConnectionInternalMsg(userId: String) extends InMessage

// DeskShare
case class DeskShareStartedRequest(conferenceName: String, callerId: String, callerIdName: String) extends InMessage
case class DeskShareStoppedRequest(conferenceName: String, callerId: String, callerIdName: String) extends InMessage
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.bigbluebutton.core.apps.users

import org.bigbluebutton.core.api.UserEstablishedGraphqlConnectionInternalMsg
import org.bigbluebutton.core.domain.MeetingState2x
import org.bigbluebutton.core.models.Users2x
import org.bigbluebutton.core.running.{ HandlerHelpers, LiveMeeting, MeetingActor, OutMsgRouter }

trait UserEstablishedGraphqlConnectionInternalMsgHdlr extends HandlerHelpers {
this: MeetingActor =>

val liveMeeting: LiveMeeting
val outGW: OutMsgRouter

def handleUserEstablishedGraphqlConnectionInternalMsg(msg: UserEstablishedGraphqlConnectionInternalMsg, state: MeetingState2x): MeetingState2x = {
log.info("Received user established a graphql connection. user {} meetingId={}", msg.userId, liveMeeting.props.meetingProp.intId)
Users2x.findWithIntId(liveMeeting.users2x, msg.userId) match {
case Some(reconnectingUser) =>
if (reconnectingUser.userLeftFlag.left) {
log.info("Resetting flag that user left meeting. user {}", msg.userId)
sendUserLeftFlagUpdatedEvtMsg(outGW, liveMeeting, msg.userId, leftFlag = false)
Users2x.resetUserLeftFlag(liveMeeting.users2x, msg.userId)
}
state
case None =>
state
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ trait UserJoinMeetingAfterReconnectReqMsgHdlr extends HandlerHelpers with UserJo
if (reconnectingUser.userLeftFlag.left) {
log.info("Resetting flag that user left meeting. user {}", msg.body.userId)
// User has reconnected. Just reset it's flag. ralam Oct 23, 2018
sendUserLeftFlagUpdatedEvtMsg(outGW, liveMeeting, msg.body.userId, false)
sendUserLeftFlagUpdatedEvtMsg(outGW, liveMeeting, msg.body.userId, leftFlag = false)
Users2x.resetUserLeftFlag(liveMeeting.users2x, msg.body.userId)
}
state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ trait UserJoinMeetingReqMsgHdlr extends HandlerHelpers {

private def resetUserLeftFlag(msg: UserJoinMeetingReqMsg) = {
log.info("Resetting flag that user left meeting. user {}", msg.body.userId)
sendUserLeftFlagUpdatedEvtMsg(outGW, liveMeeting, msg.body.userId, false)
sendUserLeftFlagUpdatedEvtMsg(outGW, liveMeeting, msg.body.userId, leftFlag = false)
Users2x.resetUserLeftFlag(liveMeeting.users2x, msg.body.userId)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.bigbluebutton.core.apps.users

import org.bigbluebutton.common2.msgs.UserLeaveReqMsg
import org.bigbluebutton.core.api.{ UserClosedAllGraphqlConnectionsInternalMsg }
import org.bigbluebutton.core.domain.MeetingState2x
import org.bigbluebutton.core.models.{ RegisteredUsers, Users2x }
import org.bigbluebutton.core.running.{ HandlerHelpers, MeetingActor, OutMsgRouter }
Expand All @@ -12,23 +13,33 @@ trait UserLeaveReqMsgHdlr extends HandlerHelpers {
val outGW: OutMsgRouter

def handleUserLeaveReqMsg(msg: UserLeaveReqMsg, state: MeetingState2x): MeetingState2x = {
Users2x.findWithIntId(liveMeeting.users2x, msg.body.userId) match {
handleUserLeaveReq(msg.body.userId, msg.header.meetingId, msg.body.loggedOut, state)
}

def handleUserClosedAllGraphqlConnectionsInternalMsg(msg: UserClosedAllGraphqlConnectionsInternalMsg, state: MeetingState2x): MeetingState2x = {
log.info("Received user closed all graphql connections. user {} meetingId={}", msg.userId, liveMeeting.props.meetingProp.intId)

handleUserLeaveReq(msg.userId, liveMeeting.props.meetingProp.intId, loggedOut = false, state)
}

def handleUserLeaveReq(userId: String, meetingId: String, loggedOut: Boolean, state: MeetingState2x): MeetingState2x = {
Users2x.findWithIntId(liveMeeting.users2x, userId) match {
case Some(reconnectingUser) =>
log.info("Received user left meeting. user {} meetingId={}", msg.body.userId, msg.header.meetingId)
log.info("Received user left meeting. user {} meetingId={}", userId, meetingId)
if (!reconnectingUser.userLeftFlag.left) {
log.info("Setting user left flag. user {} meetingId={}", msg.body.userId, msg.header.meetingId)
log.info("Setting user left flag. user {} meetingId={}", userId, meetingId)
// Just flag that user has left as the user might be reconnecting.
// An audit will remove this user if it hasn't rejoined after a certain period of time.
// ralam oct 23, 2018
sendUserLeftFlagUpdatedEvtMsg(outGW, liveMeeting, msg.body.userId, true)
sendUserLeftFlagUpdatedEvtMsg(outGW, liveMeeting, userId, leftFlag = true)

Users2x.setUserLeftFlag(liveMeeting.users2x, msg.body.userId)
Users2x.setUserLeftFlag(liveMeeting.users2x, userId)
}
if (msg.body.loggedOut) {
log.info("Setting user logged out flag. user {} meetingId={}", msg.body.userId, msg.header.meetingId)
if (loggedOut) {
log.info("Setting user logged out flag. user {} meetingId={}", userId, meetingId)

for {
ru <- RegisteredUsers.findWithUserId(msg.body.userId, liveMeeting.registeredUsers)
ru <- RegisteredUsers.findWithUserId(userId, liveMeeting.registeredUsers)
} yield {
RegisteredUsers.setUserLoggedOutFlag(liveMeeting.registeredUsers, ru)
Sender.sendForceUserGraphqlReconnectionSysMsg(liveMeeting.props.meetingProp.intId, ru.id, ru.sessionToken, "user_loggedout", outGW)
Expand All @@ -39,4 +50,5 @@ trait UserLeaveReqMsgHdlr extends HandlerHelpers {
state
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ case class UserGraphqlConnectionDbModel (
graphqlConnectionId: Option[Int],
sessionToken: String,
middlewareConnectionId: String,
stablishedAt: java.sql.Timestamp,
establishedAt: java.sql.Timestamp,
closedAt: Option[java.sql.Timestamp],
)

class UserGraphqlConnectionDbTableDef(tag: Tag) extends Table[UserGraphqlConnectionDbModel](tag, None, "user_graphqlConnection") {
override def * = (
graphqlConnectionId, sessionToken, middlewareConnectionId, stablishedAt, closedAt
graphqlConnectionId, sessionToken, middlewareConnectionId, establishedAt, closedAt
) <> (UserGraphqlConnectionDbModel.tupled, UserGraphqlConnectionDbModel.unapply)
val graphqlConnectionId = column[Option[Int]]("graphqlConnectionId", O.PrimaryKey, O.AutoInc)
val sessionToken = column[String]("sessionToken")
val middlewareConnectionId = column[String]("middlewareConnectionId")
val stablishedAt = column[java.sql.Timestamp]("stablishedAt")
val establishedAt = column[java.sql.Timestamp]("establishedAt")
val closedAt = column[Option[java.sql.Timestamp]]("closedAt")
}

Expand All @@ -34,7 +34,7 @@ object UserGraphqlConnectionDAO {
graphqlConnectionId = None,
sessionToken = sessionToken,
middlewareConnectionId = middlewareConnectionId,
stablishedAt = new java.sql.Timestamp(System.currentTimeMillis()),
establishedAt = new java.sql.Timestamp(System.currentTimeMillis()),
closedAt = None
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class MeetingActor(

with UserJoinMeetingReqMsgHdlr
with UserJoinMeetingAfterReconnectReqMsgHdlr
with UserEstablishedGraphqlConnectionInternalMsgHdlr
with UserConnectedToGlobalAudioMsgHdlr
with UserDisconnectedFromGlobalAudioMsgHdlr
with MuteAllExceptPresentersCmdMsgHdlr
Expand Down Expand Up @@ -266,8 +267,14 @@ class MeetingActor(
// internal messages
case msg: MonitorNumberOfUsersInternalMsg => handleMonitorNumberOfUsers(msg)
case msg: SetPresenterInDefaultPodInternalMsg => state = presentationPodsApp.handleSetPresenterInDefaultPodInternalMsg(msg, state, liveMeeting, msgBus)

case msg: ExtendMeetingDuration => handleExtendMeetingDuration(msg)
case msg: UserClosedAllGraphqlConnectionsInternalMsg =>
state = handleUserClosedAllGraphqlConnectionsInternalMsg(msg, state)
updateModeratorsPresence()
case msg: UserEstablishedGraphqlConnectionInternalMsg =>
state = handleUserEstablishedGraphqlConnectionInternalMsg(msg, state)
updateModeratorsPresence()

case msg: ExtendMeetingDuration => handleExtendMeetingDuration(msg)
case msg: SendTimeRemainingAuditInternalMsg =>
if (!liveMeeting.props.meetingProp.isBreakout) {
// Update users of meeting remaining time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,43 @@ package org.bigbluebutton.endpoint.redis

import org.apache.pekko.actor.{Actor, ActorLogging, ActorSystem, Props}
import org.bigbluebutton.common2.msgs._
import org.bigbluebutton.core.api.{UserClosedAllGraphqlConnectionsInternalMsg, UserEstablishedGraphqlConnectionInternalMsg}
import org.bigbluebutton.core.bus.{BigBlueButtonEvent, InternalEventBus}
import org.bigbluebutton.core.db.UserGraphqlConnectionDAO

object GraphqlActionsActor {
def props(system: ActorSystem): Props =
def props(system: ActorSystem,
eventBus: InternalEventBus,
): Props =
Props(
classOf[GraphqlActionsActor],
system,
eventBus,
)
}

case class GraphqlUser(
intId: String,
meetingId: String,
sessionToken: String,
)

case class GraphqlUserConnection(
middlewareUID: String,
browserConnectionId: String,
sessionToken: String,
user: GraphqlUser,
)


class GraphqlActionsActor(
system: ActorSystem,
val eventBus: InternalEventBus,
) extends Actor with ActorLogging {

private var users: Map[String, GraphqlUser] = Map()
private var graphqlConnections: Map[String, GraphqlUserConnection] = Map()

def receive = {
//=============================
// 2x messages
Expand All @@ -25,19 +48,62 @@ class GraphqlActionsActor(

private def handleBbbCommonEnvCoreMsg(msg: BbbCommonEnvCoreMsg): Unit = {
msg.core match {
case m: RegisterUserReqMsg => handleUserRegisteredRespMsg(m)
case m: DestroyMeetingSysCmdMsg => handleDestroyMeetingSysCmdMsg(m)
// Messages from bbb-graphql-middleware
case m: UserGraphqlConnectionEstablishedSysMsg => handleUserGraphqlConnectionEstablishedSysMsg(m)
case m: UserGraphqlConnectionEstablishedSysMsg => handleUserGraphqlConnectionEstablishedSysMsg(m)
case m: UserGraphqlConnectionClosedSysMsg => handleUserGraphqlConnectionClosedSysMsg(m)
case _ => // message not to be handled.
case _ => // message not to be handled.
}
}

private def handleUserGraphqlConnectionEstablishedSysMsg(msg: UserGraphqlConnectionEstablishedSysMsg) {
private def handleUserRegisteredRespMsg(msg: RegisterUserReqMsg): Unit = {
users += (msg.body.sessionToken -> GraphqlUser(
msg.body.intUserId,
msg.body.meetingId,
msg.body.sessionToken
))
}

private def handleDestroyMeetingSysCmdMsg(msg: DestroyMeetingSysCmdMsg): Unit = {
users = users.filter(u => u._2.meetingId != msg.body.meetingId)
graphqlConnections = graphqlConnections.filter(c => c._2.user.meetingId != msg.body.meetingId)
}

private def handleUserGraphqlConnectionEstablishedSysMsg(msg: UserGraphqlConnectionEstablishedSysMsg): Unit = {
UserGraphqlConnectionDAO.insert(msg.body.sessionToken, msg.body.browserConnectionId)

for {
user <- users.get(msg.body.sessionToken)
} yield {

//Send internal message informing user has connected
if (!graphqlConnections.values.exists(c => c.sessionToken == msg.body.sessionToken)) {
eventBus.publish(BigBlueButtonEvent(user.meetingId, UserEstablishedGraphqlConnectionInternalMsg(user.intId)))
}

graphqlConnections += (msg.body.browserConnectionId -> GraphqlUserConnection(
msg.body.middlewareUID,
msg.body.browserConnectionId,
msg.body.sessionToken,
user
))
}
}

private def handleUserGraphqlConnectionClosedSysMsg(msg: UserGraphqlConnectionClosedSysMsg) {
private def handleUserGraphqlConnectionClosedSysMsg(msg: UserGraphqlConnectionClosedSysMsg): Unit = {
UserGraphqlConnectionDAO.updateClosed(msg.body.sessionToken, msg.body.browserConnectionId)

for {
user <- users.get(msg.body.sessionToken)
} yield {
graphqlConnections = graphqlConnections.-(msg.body.browserConnectionId)

//Send internal message informing user disconnected
if (!graphqlConnections.values.exists(c => c.sessionToken == msg.body.sessionToken)) {
eventBus.publish(BigBlueButtonEvent(user.meetingId, UserClosedAllGraphqlConnectionsInternalMsg(user.intId)))
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -251,21 +251,21 @@ case class UserGraphqlReconnectionForcedEvtMsg(
header: BbbCoreBaseHeader,
body: UserGraphqlReconnectionForcedEvtMsgBody
) extends BbbCoreMsg
case class UserGraphqlReconnectionForcedEvtMsgBody(sessionToken: String, browserConnectionId: String)
case class UserGraphqlReconnectionForcedEvtMsgBody(middlewareUID: String, sessionToken: String, browserConnectionId: String)

object UserGraphqlConnectionEstablishedSysMsg { val NAME = "UserGraphqlConnectionEstablishedSysMsg" }
case class UserGraphqlConnectionEstablishedSysMsg(
header: BbbCoreBaseHeader,
body: UserGraphqlConnectionEstablishedSysMsgBody
) extends BbbCoreMsg
case class UserGraphqlConnectionEstablishedSysMsgBody(sessionToken: String, browserConnectionId: String)
case class UserGraphqlConnectionEstablishedSysMsgBody(middlewareUID: String, sessionToken: String, browserConnectionId: String)

object UserGraphqlConnectionClosedSysMsg { val NAME = "UserGraphqlConnectionClosedSysMsg" }
case class UserGraphqlConnectionClosedSysMsg(
header: BbbCoreBaseHeader,
body: UserGraphqlConnectionClosedSysMsgBody
) extends BbbCoreMsg
case class UserGraphqlConnectionClosedSysMsgBody(sessionToken: String, browserConnectionId: String)
case class UserGraphqlConnectionClosedSysMsgBody(middlewareUID: String, sessionToken: String, browserConnectionId: String)

/**
* Sent from akka-apps to bbb-web to inform a summary of the meeting activities
Expand Down
4 changes: 4 additions & 0 deletions bbb-graphql-middleware/cmd/bbb-graphql-middleware/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"github.com/iMDT/bbb-graphql-middleware/internal/common"
"github.com/iMDT/bbb-graphql-middleware/internal/msgpatch"
"github.com/iMDT/bbb-graphql-middleware/internal/websrv"
log "github.com/sirupsen/logrus"
Expand All @@ -21,6 +22,9 @@ func main() {
log.SetFormatter(&log.JSONFormatter{})
log := log.WithField("_routine", "main")

common.InitUniqueID()
log = log.WithField("graphql-middleware-uid", common.GetUniqueID())

log.Infof("Logger level=%v", log.Logger.Level)

//Clear cache from last exec
Expand Down
1 change: 1 addition & 0 deletions bbb-graphql-middleware/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/evanphx/json-patch v0.5.2 // indirect
github.com/google/uuid v1.6.0 // indirect
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
)
2 changes: 2 additions & 0 deletions bbb-graphql-middleware/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/evanphx/json-patch v0.5.2 h1:xVCHIVMUu1wtM/VkR9jVZ45N3FhZfYMMYGorLCR8P3k=
github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/mattbaird/jsonpatch v0.0.0-20230413205102-771768614e91 h1:JnZSkFP1/GLwKCEuuWVhsacvbDQIVa5BRwAwd+9k2Vw=
github.com/mattbaird/jsonpatch v0.0.0-20230413205102-771768614e91/go.mod h1:M1qoD/MqPgTZIk0EWKB38wE28ACRfVcn+cU08jyArI0=
Expand Down
15 changes: 15 additions & 0 deletions bbb-graphql-middleware/internal/common/GlobalState.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package common

import (
"github.com/google/uuid"
)

var uniqueID string

func InitUniqueID() {
uniqueID = uuid.New().String()
}

func GetUniqueID() string {
return uniqueID
}

0 comments on commit f7cf603

Please sign in to comment.