Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor (graphql-middleware): Improve rate limiter and others #19839

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion bbb-graphql-middleware/bbb-graphql-middleware-config.env
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ BBB_GRAPHQL_MIDDLEWARE_LISTEN_PORT=8378
BBB_GRAPHQL_MIDDLEWARE_REDIS_ADDRESS=127.0.0.1:6379
BBB_GRAPHQL_MIDDLEWARE_REDIS_PASSWORD=
BBB_GRAPHQL_MIDDLEWARE_HASURA_WS=ws://127.0.0.1:8080/v1/graphql
BBB_GRAPHQL_MIDDLEWARE_RATE_LIMIT_IN_MS=50
BBB_GRAPHQL_MIDDLEWARE_MAX_CONN_PER_SECOND=10

# If you are running a cluster proxy setup, you need to configure the Origin of
# the frontend. See https://docs.bigbluebutton.org/administration/cluster-proxy
Expand Down
21 changes: 12 additions & 9 deletions bbb-graphql-middleware/cmd/bbb-graphql-middleware/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package main

import (
"context"
"errors"
"fmt"
"github.com/iMDT/bbb-graphql-middleware/internal/common"
"github.com/iMDT/bbb-graphql-middleware/internal/msgpatch"
"github.com/iMDT/bbb-graphql-middleware/internal/websrv"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"net/http"
"os"
"strconv"
Expand Down Expand Up @@ -53,20 +53,23 @@ func main() {
}

//Define new Connections Rate Limit
rateLimitInMs := 50
if envRateLimitInMs := os.Getenv("BBB_GRAPHQL_MIDDLEWARE_RATE_LIMIT_IN_MS"); envRateLimitInMs != "" {
if envRateLimitInMsAsInt, err := strconv.Atoi(envRateLimitInMs); err == nil {
rateLimitInMs = envRateLimitInMsAsInt
maxConnPerSecond := 10
if envMaxConnPerSecond := os.Getenv("BBB_GRAPHQL_MIDDLEWARE_MAX_CONN_PER_SECOND"); envMaxConnPerSecond != "" {
if envMaxConnPerSecondAsInt, err := strconv.Atoi(envMaxConnPerSecond); err == nil {
maxConnPerSecond = envMaxConnPerSecondAsInt
}
}
limiterInterval := rate.NewLimiter(rate.Every(time.Duration(rateLimitInMs)*time.Millisecond), 1)
rateLimiter := common.NewCustomRateLimiter(maxConnPerSecond)

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
ctx, cancel := context.WithTimeout(r.Context(), 120*time.Second)
defer cancel()

if err := limiterInterval.Wait(ctx); err != nil {
http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
if err := rateLimiter.Wait(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
http.Error(w, "Request cancelled or rate limit exceeded", http.StatusTooManyRequests)
}

return
}

Expand Down
64 changes: 64 additions & 0 deletions bbb-graphql-middleware/internal/common/CustomRateLimiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package common

import (
"context"
"time"
)

type CustomRateLimiter struct {
tokens chan struct{}
requestQueue chan context.Context
}

func NewCustomRateLimiter(requestsPerSecond int) *CustomRateLimiter {
rl := &CustomRateLimiter{
tokens: make(chan struct{}, requestsPerSecond),
requestQueue: make(chan context.Context, 20000), // Adjust the size accordingly
}

go rl.refillTokens(requestsPerSecond)
go rl.processQueue()

return rl
}

func (rl *CustomRateLimiter) refillTokens(requestsPerSecond int) {
ticker := time.NewTicker(time.Second / time.Duration(requestsPerSecond))
for {
select {
case <-ticker.C:
// Try to add a token, skip if full
select {
case rl.tokens <- struct{}{}:
default:
}
}
}
}

func (rl *CustomRateLimiter) processQueue() {
for ctx := range rl.requestQueue {
select {
case <-rl.tokens:
if ctx.Err() == nil {
// Token acquired and context not cancelled, proceed
// Simulate processing by calling a dummy function
// processRequest() or similar
}
case <-ctx.Done():
// Context cancelled, skip
}
}
}

func (rl *CustomRateLimiter) Wait(ctx context.Context) error {
rl.requestQueue <- ctx
select {
case <-ctx.Done():
// Request cancelled
return ctx.Err()
case <-rl.tokens:
// Acquired token, proceed
return nil
}
}
78 changes: 73 additions & 5 deletions bbb-graphql-middleware/internal/websrv/connhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,13 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {
defer func() {
msgpatch.RemoveConnCacheDir(browserConnectionId)
BrowserConnectionsMutex.Lock()
sessionTokenRemoved := BrowserConnections[browserConnectionId].SessionToken
delete(BrowserConnections, browserConnectionId)
_, bcExists := BrowserConnections[browserConnectionId]
if bcExists {
sessionTokenRemoved := BrowserConnections[browserConnectionId].SessionToken
delete(BrowserConnections, browserConnectionId)
go SendUserGraphqlConnectionClosedSysMsg(sessionTokenRemoved, browserConnectionId)
}
BrowserConnectionsMutex.Unlock()
go SendUserGraphqlConnectionClosedSysMsg(sessionTokenRemoved, browserConnectionId)

log.Infof("connection removed")
}()
Expand Down Expand Up @@ -135,7 +138,71 @@ func ConnectionHandler(w http.ResponseWriter, r *http.Request) {

func InvalidateSessionTokenConnections(sessionTokenToInvalidate string) {
BrowserConnectionsMutex.RLock()
connectionsToProcess := make([]*common.BrowserConnection, 0)
for _, browserConnection := range BrowserConnections {
if browserConnection.SessionToken == sessionTokenToInvalidate {
connectionsToProcess = append(connectionsToProcess, browserConnection)
}
}
BrowserConnectionsMutex.RUnlock()

var wg sync.WaitGroup
for _, browserConnection := range connectionsToProcess {
wg.Add(1)
go func(bc *common.BrowserConnection) {
defer wg.Done()
invalidateBrowserConnection(bc, sessionTokenToInvalidate)
}(browserConnection)
}
wg.Wait()
}

func invalidateBrowserConnection(bc *common.BrowserConnection, sessionToken string) {
if bc.HasuraConnection == nil {
return // If there's no Hasura connection, there's nothing to invalidate.
}

hasuraConnectionId := bc.HasuraConnection.Id

// Send message to stop receiving new messages from the browser.
bc.HasuraConnection.FreezeMsgFromBrowserChan.Send(true)

// Wait until there are no active mutations.
for iterationCount := 0; iterationCount < 20; iterationCount++ {
activeMutationFound := false
bc.ActiveSubscriptionsMutex.RLock()
for _, subscription := range bc.ActiveSubscriptions {
if subscription.Type == common.Mutation {
activeMutationFound = true
break
}
}
bc.ActiveSubscriptionsMutex.RUnlock()

if !activeMutationFound {
break // Exit the loop if no active mutations are found.
}
time.Sleep(100 * time.Millisecond) // Wait a bit before checking again.
}

log.Debugf("Processing invalidate request for sessionToken %v (hasura connection %v)", sessionToken, hasuraConnectionId)

// Cancel the Hasura connection context to clean up resources.
if bc.HasuraConnection != nil && bc.HasuraConnection.ContextCancelFunc != nil {
bc.HasuraConnection.ContextCancelFunc()
}

log.Debugf("Processed invalidate request for sessionToken %v (hasura connection %v)", sessionToken, hasuraConnectionId)

// Send a reconnection confirmation message
go SendUserGraphqlReconnectionForcedEvtMsg(sessionToken)
}

func InvalidateSessionTokenConnectionsB(sessionTokenToInvalidate string) {
BrowserConnectionsMutex.RLock()
for _, browserConnection := range BrowserConnections {
hasuraConnectionId := browserConnection.HasuraConnection.Id

if browserConnection.SessionToken == sessionTokenToInvalidate {
if browserConnection.HasuraConnection != nil {
//Send message to force stop receiving new messages from the browser
Expand All @@ -159,9 +226,10 @@ func InvalidateSessionTokenConnections(sessionTokenToInvalidate string) {
time.Sleep(100 * time.Millisecond)
}

hasuraConnectionId := browserConnection.HasuraConnection.Id
log.Debugf("Processing invalidate request for sessionToken %v (hasura connection %v)", sessionTokenToInvalidate, hasuraConnectionId)
browserConnection.HasuraConnection.ContextCancelFunc()
if browserConnection.HasuraConnection != nil {
browserConnection.HasuraConnection.ContextCancelFunc()
}
log.Debugf("Processed invalidate request for sessionToken %v (hasura connection %v)", sessionTokenToInvalidate, hasuraConnectionId)

go SendUserGraphqlReconnectionForcedEvtMsg(browserConnection.SessionToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const ConnectionManager: React.FC<ConnectionManagerProps> = ({ children }): Reac
const subscription = new SubscriptionClient(graphqlUrl, {
reconnect: true,
timeout: 30000,
minTimeout: 30000,
connectionParams: {
headers: {
'X-Session-Token': sessionToken,
Expand Down