Skip to content
This repository has been archived by the owner on Sep 3, 2024. It is now read-only.

Fix possible msg corruption on a busy network #86

Merged

Conversation

andriytk
Copy link
Contributor

The sendMany routine can send one msg via several c_writev calls
when the OS sending queues are busy. So in between these calls the
sending thread could be interrupted by some outer non-IO exception
(like ProcessLinkException) which caused msg corruption.

Now we call sendMany in a separate thread which is shielded from
outer exceptions.

andriytk referenced this pull request May 14, 2019
Instead of using schedule/runScheduled actions separately,
because they are exceptions unsafe, we introduce a
`withScheduledAction` function that cares about the safety
by providing correct finalizers.

X-Bug-URL: https://cloud-haskell.atlassian.net/browse/DP-109
@andriytk
Copy link
Contributor Author

andriytk commented May 14, 2019

I'm still testing this for the haskell-distributed/distributed-process#341, so don't be hurry to land it. But a review is warmly welcome!

@facundominguez
Copy link
Contributor

Thanks Andriy for the patch. But IIUC, the child thread could be writing to the socket well beyond the point at which it has been closed. This could cause undefined behavior.

@andriytk
Copy link
Contributor Author

If the socket is closed, it will fire IOException from sendMany, no? And this exception will be rethrown from the Async.wait.

@facundominguez
Copy link
Contributor

If the socket is closed, it will fire IOException from sendMany, no?

Last time we checked, the Haskell API to sockets wasn't safer than C's in this regard. And in C, passing a socket that has been closed to write or read call is undefined. When hacking, I've seen the same file descriptor being reused for sockets that are opened after the old one is closed, for instance.

@andriytk
Copy link
Contributor Author

Well, it seems the current code suffers from this issue as well, no? I mean, sendOn can be called on the already closed socket. In this regard, my patch does not seem to change anything. During debugging I implemented another patch for this particular issue, but then rolled it back as it did not seem to be very useful (because the exception was always raised whenever there was some problem with the socket). Here it is:

--- a/network-transport-tcp/src/Network/Transport/TCP.hs
+++ b/network-transport-tcp/src/Network/Transport/TCP.hs
@@ -143,6 +143,7 @@ import Control.Exception
   , finally
   , catch
   , bracket
+  , mask
   , mask_
   )
 import Data.IORef (IORef, newIORef, writeIORef, readIORef, writeIORef)
@@ -446,7 +447,7 @@ data ValidRemoteEndPointState = ValidRemoteEndPointState
      -- | When the connection is being probed, yields an IO action that can be
      -- used to release any resources dedicated to the probing.
   ,  remoteProbing       :: Maybe (IO ())
-  ,  remoteSendLock      :: !(MVar ())
+  ,  remoteSendLock      :: !(MVar Bool)
      -- | An IO which returns when the socket (remoteSocket) has been closed.
      --   The program/thread which created the socket is always responsible
      --   for closing it, but sometimes other threads need to know when this
@@ -1024,7 +1025,7 @@ handleConnectionRequest transport socketClosed (sock, sockAddr) = handle handleE
               [encodeWord32 (encodeConnectionRequestResponse ConnectionRequestCrossed)]
             probeIfValid theirEndPoint
           else do
-            sendLock <- newMVar ()
+            sendLock <- newMVar True
             let vst = ValidRemoteEndPointState
                         {  remoteSocket        = sock
                         ,  remoteSocketClosed  = socketClosed
@@ -1516,7 +1517,7 @@ setupRemoteEndPoint params (ourEndPoint, theirEndPoint) connTimeout = do
       -- (readMVar socketClosedVar), and we'll take care of closing it up
       -- once handleIncomingMessages has finished.
       Right (socketClosedVar, sock, ConnectionRequestAccepted) -> do
-        sendLock <- newMVar ()
+        sendLock <- newMVar True
         let vst = ValidRemoteEndPointState
                     {  remoteSocket        = sock
                     ,  remoteSocketClosed  = readMVar socketClosedVar
@@ -1874,8 +1875,17 @@ findRemoteEndPoint ourEndPoint theirAddress findOrigin mtimer = go
 
 -- | Send a payload over a heavyweight connection (thread safe)
 sendOn :: ValidRemoteEndPointState -> [ByteString] -> IO ()
-sendOn vst bs = withMVar (remoteSendLock vst) $ \() ->
-  sendMany (remoteSocket vst) bs
+sendOn vst bs =
+  mask $ \restore -> do
+    is_good <- takeMVar (remoteSendLock vst)
+    when (is_good) $ do
+      restore (sendMany (remoteSocket vst) bs) `catch` \ex -> do
+        putMVar (remoteSendLock vst) False
+        throwIO (ex :: IOException)
+    putMVar (remoteSendLock vst) is_good
 
 --------------------------------------------------------------------------------
 -- Scheduling actions                                                         --

Let me know if you'd like to land it so I prepare another PR.

@facundominguez
Copy link
Contributor

Well, it seems the current code suffers from this issue as well, no?

Maybe. Though so far it looked to me like it wouldn't be possible. And there were some PRs in the past to reach the current state. How do you get the code in master to use the socket after it has been closed?

@andriytk
Copy link
Contributor Author

andriytk commented May 14, 2019

runScheduledAction may be called when the remote endpoint is not in the valid state already. For example, when several threads are blocked on sendOn's MVar and the current one fails with an IOException - the others will still try to exec their sendOn whenever the MVar is unblocked - there is nothing that would stop them from doing this.

@andriytk
Copy link
Contributor Author

andriytk commented May 14, 2019

BTW, the proposed patch does not solve the haskell-distributed/distributed-process#341 issue - it has just reproduced again. :(

But let me try it with both patches combined...

@andriytk
Copy link
Contributor Author

Well, with the two patches combined - so far so good...

@facundominguez
Copy link
Contributor

runScheduledAction may be called when the remote endpoint is not in the valid state already.

Makes sense. This is related to haskell-distributed/distributed-process#440.

@andriytk andriytk changed the title Fix rare msg corruption on a busy network Fix msg corruption on a busy network May 16, 2019
@andriytk
Copy link
Contributor Author

Actually, the haskell-distributed/distributed-process#341 issue does not reproduce even with the 2nd patch only (which prevents the socket usage after an IOException). So let me prepare it for landing separately.

@andriytk andriytk force-pushed the msg-corruption-fix branch 3 times, most recently from 5b08e7a to 1c65590 Compare May 16, 2019 09:30
@andriytk
Copy link
Contributor Author

andriytk commented May 16, 2019

Ah, no - just reproduced it again with the 2nd patch only:

ssu7: May 16 12:50:23 ssu7 p[21399]: halond: Data.Binary.Get.runGet at position 586425: not enough bytes
ssu7: May 16 12:50:23 ssu7 p[21399]: error, called at libraries/binary/src/Data/Binary/Get.hs:351:5 in binary-0.8.5.1:Data.Binary.Get
ssu1: May 16 12:50:23 ssu1 p[23118]: sendOn: ex=ProcessLinkException pid://172.28.128.3:9070:0:312406 DiedNormal
ssu1: May 16 12:50:23 ssu1 p[23118]: sendOn: ex=ProcessLinkException pid://172.28.128.3:9070:0:312406 DiedNormal

So we do need them both: sendMany should not be disturbed by outer exceptions (like ProcessLinkException) and the socket should not be re-used after an inner IOException.

@andriytk andriytk force-pushed the msg-corruption-fix branch 2 times, most recently from ab27f44 to 64bc367 Compare May 16, 2019 15:55
@andriytk andriytk changed the title Fix msg corruption on a busy network Fix possible msg corruption on a busy network May 16, 2019
@andriytk
Copy link
Contributor Author

The patch works.

@andriytk
Copy link
Contributor Author

Updated the comment in the latest patch.

@facundominguez
Copy link
Contributor

Thanks @andriytk! I'll merge it soon.

The non-atomic sendMany routine from the network pkg can send
one msg via several c_writev calls when the OS sending queues
are busy. So in between these calls the sending thread could
be interrupted by an inner IOException (when, for example,
the connection breaks by the TCP user timeout) or by an outer
exception (like ProcessLinkException). Concurrent threads (which
scheduled the sending before the exception on still valid remote
enpoint) could re-use the socket and start sending a new msg then
which would cause the old msg corruption on the receiving side.

Now we prevent the socket usage at sendOn after an IOException
(with the help of the same MVar which was already there, so we
have it for free). And we call sendMany in a separate thread
which is not targeted (and thus, not interrupted) by the outer
exceptions. (Haskell threads are cheap, aren't they?)
@facundominguez facundominguez merged commit b738345 into haskell-distributed:master May 23, 2019
@andriytk andriytk deleted the msg-corruption-fix branch May 23, 2019 20:49
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants