-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
server/comms: fix synchronization issues and drop sleeps in tests #435
Conversation
Channel patterns are used throughout. There are still 2 time.Sleeps, one for response handler timeout and one for checking that ping-pongs are working.
client/rpcserver/rpcserver.go
Outdated
// Run starts the web server. Satisfies the dex.Runner interface. | ||
func (s *RPCServer) Run(ctx context.Context) { | ||
// Connect starts the RPC server. Satisfies the dex.Connector interface. | ||
func (s *RPCServer) Connect(ctx context.Context) (error, *sync.WaitGroup) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I'm going to reverse the order of these return values. I'm just trying to avoid any more conflicts than necessary with #408 for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH, I'm thinking this PR goes in before #408, even though that creates some resolution work for @kevinstl . I think having the canonical pattern laid out first is good. How would you prefer to proceed, @buck54321 ?
c.sendWG.Add(1) | ||
c.outChan <- b | ||
go func() { | ||
c.writeMtx.Lock() | ||
b := <-c.outChan | ||
c.conn.SetWriteDeadline(time.Now().Add(writeWait)) | ||
err := c.conn.WriteMessage(websocket.TextMessage, b) | ||
c.writeMtx.Unlock() | ||
c.sendWG.Done() | ||
if err != nil { | ||
c.Disconnect() | ||
} | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pattern is quite different than before, when there was a persistent goroutine to handle pings and outgoing messages. Doing it like this with the sendWg sync.WaitGroup
allows us to have a "graceful" shutdown, and not be racy with any messages sent right before Disconnect
is called. The outChan
is still used to sequence the sends.
@@ -34,14 +37,51 @@ func newServer() *Server { | |||
} | |||
} | |||
|
|||
func giveItASecond(f func() bool) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though we are eliminating time.Sleep
for synchronization, there are still some times where a channel isn't convenient or would add significant complexity. giveItASecond
can be used to sync to some arbitrary condition, e.g. the client's presence in the server's client map, with a 1 ms delay between checks.
// Send the message if their is a receiver for the current test. | ||
if conn.recv != nil { | ||
conn.recv <- msg | ||
} | ||
conn.write++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
write
wasn't being used.
ListenAddrs: []string{portAddr}, | ||
ListenAddrs: []string{":0"}, | ||
RPCKey: keyPath, | ||
RPCCert: certPath, | ||
}) | ||
if err != nil { | ||
t.Fatalf("server constructor error: %v", err) | ||
} | ||
address := "wss://" + server.listeners[0].Addr().String() + "/ws" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This allows testing in parallel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Working well for me.
go func() { | ||
c.writeMtx.Lock() | ||
b := <-c.outChan | ||
c.conn.SetWriteDeadline(time.Now().Add(writeWait)) | ||
err := c.conn.WriteMessage(websocket.TextMessage, b) | ||
c.writeMtx.Unlock() | ||
c.sendWG.Done() | ||
if err != nil { | ||
c.Disconnect() | ||
} | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks a bit more memory intensive. Is that ok? I was recently scolded by @davecgh in dcrd for trying to create a goroutine for all rpc requests sent to the server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although goroutines are not terribly heavy, we will be churning goroutines quite a bit this way.
To summarize what we're trying to accomplish here: With a sending goroutine that receives on outChan
as before we run the risk of filling that channel's buffer in Send
. Relying on a channel buffer was convenient, but Send might block if that buffer fills. That maybe is not too bad because the EDIT: That is a problem as one bad client can hold up the caller for several seconds.quit
channel was the arbiter of connections status and messages were not dropped when the channel was full, just blocking.
We realized that a queue for the outgoing messages might needed. The present solution solves the queuing issue via the Go scheduler, with a goroutine created for each message sent, at the cost of goroutine churn. This may become a problem; I don't know.
Other solutions:
- A more complex implementation is to append under lock or in a for-select loop the messages to a slice of queued outgoing messages, while a the same goroutine doing the append pops entries off and sends them. It's complex because a slice is not a synchronization mechanism.
The epochPump is an example, and that's pretty hairy.What might work is a for-select with cases for (1) quit, (2), append to queue, and (3) write message from head/first. - Have each
Send
block until it can actuallyWriteMessage
itselft. This would involve acquiring a semaphore (using a channel with buffer length 1, not a mutex, to select with quit in another case) or checkingquit
after the mutex is locked, and it would makeSend
blocking...
I actually think having Send
block might be desirable so that the caller gets the error from WriteMessage
, whereas now nil
does not mean the send was successful just that marshalling was successful and the message is now queued for send.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anyway, we've chatted on matrix and I think we can run with this (a goroutine for each message sent) for now. A requirement with Send
is that a misbehaving client cannot gum up the works for up to writeWait
(seconds). Would we like each connection to have a single goroutine for the life of the connection? Yes, but I think that using a goroutine for each message to handle this concurrency requirement (non-blocking sends) is appropriate given the complexity of alternative solutions for message queuing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConnectionMaster
refactoring works for me. Also, I think we can run with the Send modification that uses a new goroutine as a means to schedule sends without blocking the Send caller. We can consider alternatives that do not require this, but it should work.
server/comms/server.go
Outdated
@@ -341,12 +349,14 @@ func (s *Server) disconnectClients() { | |||
} | |||
|
|||
// addClient assigns the client an ID and adds it to the map. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also add that it now attempts connection.
dex/ws/wslink.go
Outdated
defer c.Disconnect() | ||
defer c.wg.Done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
last in first out, so defer c.Disconnect
should be after defer c.wg.Done()
to ensure Disconnect gets called.
Resolves #394
Eliminate sloppy
time.Sleep
sync patterns in tests. ImprovesWSLink.Send
synchronization so that a graceful shutdown will wait for messages to finish sending.