Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: User Connection Status flow #19742

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.bigbluebutton.core.apps.users

import org.bigbluebutton.common2.msgs._
import org.bigbluebutton.core.apps.RightsManagementTrait
import org.bigbluebutton.core.db.UserConnectionStatusDAO
import org.bigbluebutton.core.models.{ UserState, Users2x }
import org.bigbluebutton.core.running.{ LiveMeeting, OutMsgRouter }

trait UserConnectionAliveReqMsgHdlr extends RightsManagementTrait {
this: UsersApp =>

val liveMeeting: LiveMeeting
val outGW: OutMsgRouter

def handleUserConnectionAliveReqMsg(msg: UserConnectionAliveReqMsg): Unit = {
log.info("handleUserConnectionAliveReqMsg: userId={}", msg.body.userId)

for {
user <- Users2x.findWithIntId(liveMeeting.users2x, msg.body.userId)
} yield {
UserConnectionStatusDAO.updateUserAlive(user.intId)
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.bigbluebutton.core.apps.users

import org.bigbluebutton.common2.msgs._
import org.bigbluebutton.core.apps.RightsManagementTrait
import org.bigbluebutton.core.db.UserConnectionStatusDAO
import org.bigbluebutton.core.models.{ UserState, Users2x }
import org.bigbluebutton.core.running.{ LiveMeeting, OutMsgRouter }

trait UserConnectionUpdateRttReqMsgHdlr extends RightsManagementTrait {
this: UsersApp =>

val liveMeeting: LiveMeeting
val outGW: OutMsgRouter

def handleUserConnectionUpdateRttReqMsg(msg: UserConnectionUpdateRttReqMsg): Unit = {
log.info("handleUserConnectionUpdateRttReqMsg: networkRttInMs={} userId={}", msg.body.networkRttInMs, msg.body.userId)

for {
user <- Users2x.findWithIntId(liveMeeting.users2x, msg.body.userId)
} yield {
UserConnectionStatusDAO.updateUserRtt(user.intId, msg.body.networkRttInMs)
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ class UsersApp(
with AssignPresenterReqMsgHdlr
with ChangeUserPinStateReqMsgHdlr
with ChangeUserMobileFlagReqMsgHdlr
with UserConnectionAliveReqMsgHdlr
with UserConnectionUpdateRttReqMsgHdlr
with ChangeUserReactionEmojiReqMsgHdlr
with ChangeUserRaiseHandReqMsgHdlr
with ChangeUserAwayReqMsgHdlr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,22 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{ Failure, Success }

case class UserConnectionStatusDbModel(
userId: String,
meetingId: String,
connectionAliveAt: Option[java.sql.Timestamp]
userId: String,
meetingId: String,
connectionAliveAt: Option[java.sql.Timestamp],
userClientResponseAt: Option[java.sql.Timestamp],
networkRttInMs: Option[Double]
)

class UserConnectionStatusDbTableDef(tag: Tag) extends Table[UserConnectionStatusDbModel](tag, None, "user_connectionStatus") {
override def * = (
userId, meetingId, connectionAliveAt
userId, meetingId, connectionAliveAt, userClientResponseAt, networkRttInMs
) <> (UserConnectionStatusDbModel.tupled, UserConnectionStatusDbModel.unapply)
val userId = column[String]("userId", O.PrimaryKey)
val meetingId = column[String]("meetingId")
val connectionAliveAt = column[Option[java.sql.Timestamp]]("connectionAliveAt")
val userClientResponseAt = column[Option[java.sql.Timestamp]]("userClientResponseAt")
val networkRttInMs = column[Option[Double]]("networkRttInMs")
}

object UserConnectionStatusDAO {
Expand All @@ -27,7 +31,9 @@ object UserConnectionStatusDAO {
UserConnectionStatusDbModel(
userId = userId,
meetingId = meetingId,
connectionAliveAt = None
connectionAliveAt = None,
userClientResponseAt = None,
networkRttInMs = None
)
)
).onComplete {
Expand All @@ -36,4 +42,28 @@ object UserConnectionStatusDAO {
}
}

def updateUserAlive(userId: String) = {
DatabaseConnection.db.run(
TableQuery[UserConnectionStatusDbTableDef]
.filter(_.userId === userId)
.map(t => (t.connectionAliveAt))
.update(Some(new java.sql.Timestamp(System.currentTimeMillis())))
).onComplete {
case Success(rowsAffected) => DatabaseConnection.logger.debug(s"$rowsAffected row(s) updated connectionAliveAt on UserConnectionStatus table!")
case Failure(e) => DatabaseConnection.logger.debug(s"Error updating connectionAliveAt on UserConnectionStatus: $e")
}
}

def updateUserRtt(userId: String, networkRttInMs: Double) = {
DatabaseConnection.db.run(
TableQuery[UserConnectionStatusDbTableDef]
.filter(_.userId === userId)
.map(t => (t.networkRttInMs, t.userClientResponseAt))
.update((Some(networkRttInMs), Some(new java.sql.Timestamp(System.currentTimeMillis()))))
).onComplete {
case Success(rowsAffected) => DatabaseConnection.logger.debug(s"$rowsAffected row(s) updated networkRttInMs on UserConnectionStatus table!")
case Failure(e) => DatabaseConnection.logger.debug(s"Error updating networkRttInMs on UserConnectionStatus: $e")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ class ReceivedJsonMsgHandlerActor(
routeGenericMsg[ChangeUserPinStateReqMsg](envelope, jsonNode)
case ChangeUserMobileFlagReqMsg.NAME =>
routeGenericMsg[ChangeUserMobileFlagReqMsg](envelope, jsonNode)
case UserConnectionAliveReqMsg.NAME =>
routeGenericMsg[UserConnectionAliveReqMsg](envelope, jsonNode)
case UserConnectionUpdateRttReqMsg.NAME =>
routeGenericMsg[UserConnectionUpdateRttReqMsg](envelope, jsonNode)
case SetUserSpeechLocaleReqMsg.NAME =>
routeGenericMsg[SetUserSpeechLocaleReqMsg](envelope, jsonNode)
case SelectRandomViewerReqMsg.NAME =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ class MeetingActor(
case m: SelectRandomViewerReqMsg => usersApp.handleSelectRandomViewerReqMsg(m)
case m: ChangeUserPinStateReqMsg => usersApp.handleChangeUserPinStateReqMsg(m)
case m: ChangeUserMobileFlagReqMsg => usersApp.handleChangeUserMobileFlagReqMsg(m)
case m: UserConnectionAliveReqMsg => usersApp.handleUserConnectionAliveReqMsg(m)
case m: UserConnectionUpdateRttReqMsg => usersApp.handleUserConnectionUpdateRttReqMsg(m)
case m: SetUserSpeechLocaleReqMsg => usersApp.handleSetUserSpeechLocaleReqMsg(m)

// Client requested to eject user
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class AnalyticsActor(val includeChat: Boolean) extends Actor with ActorLogging {
case m: AssignPresenterReqMsg => logMessage(msg)
case m: ChangeUserPinStateReqMsg => logMessage(msg)
case m: ChangeUserMobileFlagReqMsg => logMessage(msg)
case m: UserConnectionAliveReqMsg => logMessage(msg)
case m: UserConnectionUpdateRttReqMsg => logMessage(msg)
case m: ScreenshareRtmpBroadcastStartedVoiceConfEvtMsg => logMessage(msg)
case m: ScreenshareRtmpBroadcastStoppedVoiceConfEvtMsg => logMessage(msg)
case m: ScreenshareRtmpBroadcastStartedEvtMsg => logMessage(msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,20 @@ object ChangeUserMobileFlagReqMsg { val NAME = "ChangeUserMobileFlagReqMsg" }
case class ChangeUserMobileFlagReqMsg(header: BbbClientMsgHeader, body: ChangeUserMobileFlagReqMsgBody) extends StandardMsg
case class ChangeUserMobileFlagReqMsgBody(userId: String, mobile: Boolean)

/**
* Sent from client to inform the connection is alive.
*/
object UserConnectionAliveReqMsg { val NAME = "UserConnectionAliveReqMsg" }
case class UserConnectionAliveReqMsg(header: BbbClientMsgHeader, body: UserConnectionAliveReqMsgBody) extends StandardMsg
case class UserConnectionAliveReqMsgBody(userId: String)

/**
* Sent from client to inform the RTT (time it took to send the Alive and receive confirmation).
*/
object UserConnectionUpdateRttReqMsg { val NAME = "UserConnectionUpdateRttReqMsg" }
case class UserConnectionUpdateRttReqMsg(header: BbbClientMsgHeader, body: UserConnectionUpdateRttReqMsgBody) extends StandardMsg
case class UserConnectionUpdateRttReqMsgBody(userId: String, networkRttInMs: Double)

/**
* Sent to all clients about a user mobile flag.
*/
Expand Down
22 changes: 22 additions & 0 deletions bbb-graphql-actions/src/actions/userSetConnectionAlive.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { RedisMessage } from '../types';

export default function buildRedisMessage(sessionVariables: Record<string, unknown>, input: Record<string, unknown>): RedisMessage {
const eventName = `UserConnectionAliveReqMsg`;

const routing = {
meetingId: sessionVariables['x-hasura-meetingid'] as String,
userId: sessionVariables['x-hasura-userid'] as String
};

const header = {
name: eventName,
meetingId: routing.meetingId,
userId: routing.userId
};

const body = {
userId: routing.userId,
};

return { eventName, routing, header, body };
}
23 changes: 23 additions & 0 deletions bbb-graphql-actions/src/actions/userSetConnectionRtt.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { RedisMessage } from '../types';

export default function buildRedisMessage(sessionVariables: Record<string, unknown>, input: Record<string, unknown>): RedisMessage {
const eventName = `UserConnectionUpdateRttReqMsg`;

const routing = {
meetingId: sessionVariables['x-hasura-meetingid'] as String,
userId: sessionVariables['x-hasura-userid'] as String
};

const header = {
name: eventName,
meetingId: routing.meetingId,
userId: routing.userId
};

const body = {
userId: routing.userId,
networkRttInMs: input.networkRttInMs
};

return { eventName, routing, header, body };
}
23 changes: 6 additions & 17 deletions bbb-graphql-client-test/src/UserConnectionStatus.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,10 @@ export default function UserConnectionStatus() {
//where is not necessary once user can update only its own status
//Hasura accepts "now()" as value to timestamp fields
const [updateUserClientResponseAtToMeAsNow] = useMutation(gql`
mutation UpdateConnectionClientResponse($networkRttInMs: numeric) {
update_user_connectionStatus(
where: {userClientResponseAt: {_is_null: true}}
_set: {
userClientResponseAt: "now()",
networkRttInMs: $networkRttInMs
}
) {
affected_rows
}
mutation UpdateConnectionRtt($networkRttInMs: Float!) {
userSetConnectionRtt(
networkRttInMs: $networkRttInMs
)
}
`);

Expand All @@ -50,13 +44,8 @@ export default function UserConnectionStatus() {


const [updateConnectionAliveAtToMeAsNow] = useMutation(gql`
mutation UpdateConnectionAliveAt($userId: String, $connectionAliveAt: timestamp) {
update_user_connectionStatus(
where: {},
_set: { connectionAliveAt: "now()" }
) {
affected_rows
}
mutation UpdateConnectionAliveAt {
userSetConnectionAlive
}
`);

Expand Down
1 change: 0 additions & 1 deletion bbb-graphql-server/bbb-graphql-server.service
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ RestartSec=60
SuccessExitStatus=143
TimeoutStopSec=5
PermissionsStartOnly=true
LimitNOFILE=1024

[Install]
WantedBy=multi-user.target bigbluebutton.target
Expand Down
11 changes: 10 additions & 1 deletion bbb-graphql-server/metadata/actions.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,16 @@ type Mutation {
): Boolean
}

type Mutation {
userSetConnectionAlive: Boolean
}

type Mutation {
userSetConnectionRtt(
networkRttInMs: Float!
): Boolean
}

type Mutation {
userSetEmojiStatus(
emoji: String!
Expand Down Expand Up @@ -544,4 +554,3 @@ input GuestUserApprovalStatus {
guest: String!
status: String!
}

13 changes: 12 additions & 1 deletion bbb-graphql-server/metadata/actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,18 @@ actions:
handler: '{{HASURA_BBB_GRAPHQL_ACTIONS_ADAPTER_URL}}'
permissions:
- role: bbb_client
- name: userSetConnectionAlive
definition:
kind: synchronous
handler: '{{HASURA_BBB_GRAPHQL_ACTIONS_ADAPTER_URL}}'
permissions:
- role: bbb_client
- name: userSetConnectionRtt
definition:
kind: synchronous
handler: '{{HASURA_BBB_GRAPHQL_ACTIONS_ADAPTER_URL}}'
permissions:
- role: bbb_client
- name: userSetEmojiStatus
definition:
kind: synchronous
Expand Down Expand Up @@ -486,5 +498,4 @@ custom_types:
input_objects:
- name: BreakoutRoom
- name: GuestUserApprovalStatus
objects: []
scalars: []
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,3 @@ select_permissions:
_eq: X-Hasura-MeetingId
- userId:
_eq: X-Hasura-UserId
update_permissions:
- role: bbb_client
permission:
columns:
- connectionAliveAt
- userClientResponseAt
- networkRttInMs
filter:
_and:
- meetingId:
_eq: X-Hasura-MeetingId
- userId:
_eq: X-Hasura-UserId
check: null
set:
meetingId: x-hasura-MeetingId
userId: x-hasura-UserId
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { useEffect, useRef } from 'react';
import { useMutation, useSubscription } from '@apollo/client';
import { CONNECTION_STATUS_SUBSCRIPTION } from './queries';
import { UPDATE_CONNECTION_ALIVE_AT, UPDATE_USER_CLIENT_RESPONSE_AT } from './mutations';
import { UPDATE_CONNECTION_ALIVE_AT, UPDATE_USER_CLIENT_RTT } from './mutations';

const STATS_INTERVAL = Meteor.settings.public.stats.interval;

Expand All @@ -10,10 +10,10 @@ const ConnectionStatus = () => {
const lastStatusUpdatedAtReceived = useRef(null); // Ref to store the current timeout
const timeoutRef = useRef(null);

const [updateUserClientResponseAtToMeAsNow] = useMutation(UPDATE_USER_CLIENT_RESPONSE_AT);
const [updateUserClientRtt] = useMutation(UPDATE_USER_CLIENT_RTT);

const handleUpdateUserClientResponseAt = () => {
updateUserClientResponseAtToMeAsNow({
updateUserClientRtt({
variables: {
networkRttInMs: networkRttInMs.current,
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,18 @@
import { gql } from '@apollo/client';

export const UPDATE_CONNECTION_ALIVE_AT = gql`
mutation UpdateConnectionAliveAt($userId: String, $connectionAliveAt: timestamp) {
update_user_connectionStatus(
where: {},
_set: { connectionAliveAt: "now()" }
) {
affected_rows
}
mutation UpdateConnectionAliveAt {
userSetConnectionAlive
}`;

export const UPDATE_USER_CLIENT_RESPONSE_AT = gql`
mutation UpdateConnectionClientResponse($networkRttInMs: numeric) {
update_user_connectionStatus(
where: {userClientResponseAt: {_is_null: true}}
_set: {
userClientResponseAt: "now()",
networkRttInMs: $networkRttInMs
}
) {
affected_rows
}
export const UPDATE_USER_CLIENT_RTT = gql`
mutation UpdateConnectionRtt($networkRttInMs: Float!) {
userSetConnectionRtt(
networkRttInMs: $networkRttInMs
)
}`;

export default {
UPDATE_CONNECTION_ALIVE_AT,
UPDATE_USER_CLIENT_RESPONSE_AT,
UPDATE_USER_CLIENT_RTT,
};