Skip to content

Commit

Permalink
finally, fix the bug.
Browse files Browse the repository at this point in the history
Signed-off-by: Yonle <yonle@lecturify.net>
  • Loading branch information
Yonle committed Jun 30, 2024
1 parent 19f61b4 commit dd5dfcb
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 99 deletions.
228 changes: 135 additions & 93 deletions bouncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"sync"
//"time"

"nhooyr.io/websocket"
Expand All @@ -16,12 +17,15 @@ type SessionSubs map[string][]interface{}

type MessageChan chan []interface{}

type ClientEvents []map[string]interface{}

type Session struct {
ClientIP string

ClientREQ MessageChan
ClientCLOSE MessageChan
ClientEVENT MessageChan
ClientREQ MessageChan
ClientCLOSE MessageChan
ClientEVENT MessageChan
clientEvents ClientEvents

events SessionEvents
pendingEOSE SessionEOSEs
Expand All @@ -30,6 +34,7 @@ type Session struct {
relay relayHandler.RelaySession

destroyed chan struct{}
once sync.Once
conn *websocket.Conn
ctx context.Context
}
Expand Down Expand Up @@ -65,127 +70,164 @@ func (s *Session) StartListening() {
continue listener
}

subID, ok1 := d[1].(string)

if !ok1 {
continue listener
}

if _, ok := s.events[subID]; !ok {
s.handleUpstreamEVENT(d)
case d, open := <-s.relay.UpEOSE:
if !open {
continue listener
}

event, ok2 := d[2].(map[string]interface{})
if !ok2 {
s.handleUpstreamEOSE(d)
case conn, open := <-s.relay.UpConnected:
if !open {
continue listener
}

eventID, ok3 := event["id"].(string)
s.resendEvents(conn)
s.reopenSubscriptions(conn)

if !ok3 {
case d, open := <-s.ClientREQ:
if !open {
continue listener
}
s.handleClientREQ(d)

if _, ok := s.events[subID][eventID]; ok {
case d, open := <-s.ClientCLOSE:
if !open {
continue listener
}
s.handleClientCLOSE(d)

s.events[subID][eventID] = struct{}{}
wsjson.Write(s.ctx, s.conn, d)

if _, ok := s.pendingEOSE[subID]; ok {
if len(s.events[subID]) >= 500 {
delete(s.pendingEOSE, subID)
wsjson.Write(s.ctx, s.conn, [2]string{"EOSE", subID})
}
}
case d, open := <-s.relay.UpEOSE:
case d, open := <-s.ClientEVENT:
if !open {
continue listener
}
s.handleClientEVENT(d)

subID, ok := d[1].(string)
if !ok {
continue listener
}
case <-s.destroyed:
break listener
}
}
}()
}

if _, ok := s.pendingEOSE[subID]; !ok {
continue listener
}
func (s *Session) reopenSubscriptions(conn *websocket.Conn) {
for subID, filters := range s.subscriptions {
data := append([]interface{}{"REQ", subID}, filters...)
wsjson.Write(s.ctx, conn, data)
}
}

s.pendingEOSE[subID]++
func (s *Session) resendEvents(conn *websocket.Conn) {
for _, event := range s.clientEvents {
data := [2]interface{}{"EVENT", event}
wsjson.Write(s.ctx, conn, data)
}
}

if s.pendingEOSE[subID] >= s.relay.HowManyRelaysAreConnected {
delete(s.pendingEOSE, subID)
wsjson.Write(s.ctx, s.conn, [2]string{"EOSE", subID})
}
case conn, open := <-s.relay.UpConnected:
if !open {
continue listener
}
func (s *Session) handleClientREQ(d []interface{}) {
subID, ok1 := d[1].(string)
if !ok1 {
wsjson.Write(s.ctx, s.conn, [2]string{"NOTICE", "error: received subID is not a string"})
return
}

for subID, filters := range s.subscriptions {
ReqData := append([]interface{}{"REQ", subID}, filters...)
wsjson.Write(s.ctx, conn, ReqData)
}
filters := d[2:]

case d, open := <-s.ClientREQ:
if !open {
continue listener
}
subID, ok1 := d[1].(string)
if !ok1 {
wsjson.Write(s.ctx, s.conn, [2]string{"NOTICE", "error: received subID is not a string"})
continue listener
}
s.subscriptions[subID] = filters
s.events[subID] = make(map[string]struct{})
s.pendingEOSE[subID] = 0

filters := d[2:]
s.once.Do(s.Start)
s.relay.Broadcast(d)
}

s.subscriptions[subID] = filters
s.events[subID] = make(map[string]struct{})
s.pendingEOSE[subID] = 0
func (s *Session) handleClientCLOSE(d []interface{}) {
subID, ok1 := d[1].(string)
if !ok1 {
wsjson.Write(s.ctx, s.conn, [2]string{"NOTICE", "error: received subID is not a string"})
return
}

s.relay.Broadcast(d)
delete(s.subscriptions, subID)
delete(s.events, subID)
delete(s.pendingEOSE, subID)

case d, open := <-s.ClientCLOSE:
if !open {
continue listener
}
subID, ok1 := d[1].(string)
if !ok1 {
wsjson.Write(s.ctx, s.conn, [2]string{"NOTICE", "error: received subID is not a string"})
continue listener
}
s.relay.Broadcast(d)
wsjson.Write(s.ctx, s.conn, [3]string{"CLOSED", subID, ""})
}

delete(s.subscriptions, subID)
delete(s.events, subID)
delete(s.pendingEOSE, subID)
func (s *Session) handleClientEVENT(d []interface{}) {
event, ok1 := d[1].(map[string]interface{})
if !ok1 {
wsjson.Write(s.ctx, s.conn, [2]string{"NOTICE", "error: invalid EVENT"})
return
}

s.relay.Broadcast(d)
wsjson.Write(s.ctx, s.conn, [3]string{"CLOSED", subID, ""})
id, ok2 := event["id"].(string)
if !ok2 {
wsjson.Write(s.ctx, s.conn, [2]string{"NOTICE", "error: invalid EVENT"})
return
}

case d, open := <-s.ClientEVENT:
if !open {
continue listener
}
event, ok1 := d[1].(map[string]interface{})
if !ok1 {
wsjson.Write(s.ctx, s.conn, [2]string{"NOTICE", "error: invalid EVENT"})
continue listener
}
s.once.Do(s.Start)

id, ok2 := event["id"].(string)
if !ok2 {
wsjson.Write(s.ctx, s.conn, [2]string{"NOTICE", "error: invalid EVENT"})
continue listener
}
s.relay.Broadcast(d)
s.clientEvents = append(s.clientEvents, event)

s.relay.Broadcast(d)
wsjson.Write(s.ctx, s.conn, [4]interface{}{"OK", id, true, ""})
}

wsjson.Write(s.ctx, s.conn, [4]interface{}{"OK", id, true, ""})
case <-s.destroyed:
break listener
}
func (s *Session) handleUpstreamEVENT(d []interface{}) {
subID, ok1 := d[1].(string)

if !ok1 {
return
}

if _, ok := s.events[subID]; !ok {
return
}

event, ok2 := d[2].(map[string]interface{})
if !ok2 {
return
}

eventID, ok3 := event["id"].(string)

if !ok3 {
return
}

if _, ok := s.events[subID][eventID]; ok {
return
}

s.events[subID][eventID] = struct{}{}
wsjson.Write(s.ctx, s.conn, d)

if _, ok := s.pendingEOSE[subID]; ok {
if len(s.events[subID]) >= 500 {
delete(s.pendingEOSE, subID)
wsjson.Write(s.ctx, s.conn, [2]string{"EOSE", subID})
}
}()
}
}

func (s *Session) handleUpstreamEOSE(d []interface{}) {
subID, ok := d[1].(string)
if !ok {
return
}

if _, ok := s.pendingEOSE[subID]; !ok {
return
}

s.pendingEOSE[subID]++

if s.pendingEOSE[subID] >= s.relay.HowManyRelaysAreConnected {
delete(s.pendingEOSE, subID)
wsjson.Write(s.ctx, s.conn, [2]string{"EOSE", subID})
}
}
6 changes: 0 additions & 6 deletions websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"log"
"net/http"
"sync"

"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
Expand Down Expand Up @@ -34,9 +33,6 @@ func Accept_Websocket(w http.ResponseWriter, r *http.Request, ip string, ua stri
defer cancel()

var relaySession = relayHandler.NewSession(ctx)

var once sync.Once

var s = Session{
ClientIP: ip,

Expand Down Expand Up @@ -84,7 +80,6 @@ listener:
continue listener
}

once.Do(s.Start)
s.ClientREQ <- json
case "CLOSE":
if len(json) < 2 {
Expand All @@ -99,7 +94,6 @@ listener:
continue listener
}

once.Do(s.Start)
s.ClientEVENT <- json
default:
wsjson.Write(ctx, conn, [2]string{"NOTICE", fmt.Sprintf("error: unknown command %s", cmd)})
Expand Down

0 comments on commit dd5dfcb

Please sign in to comment.