Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed the race condition in StreamingWebSocketClient #739

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

snake-4
Copy link
Contributor

@snake-4 snake-4 commented Nov 20, 2021

  • Fixed the race condition in StreamingWebSocketClient
  • Made StreamingWebSocketClient.StopAsync and StreamingWebSocketClient.StartAsync thread-safe
  • Included the changes in WebSocketClient, fixed starvation condition #738
  • Fixed the problem where the _listener task exited immediately thus couldn't be waited. Read here for more information.
  • The listener now quits when the WebSocket disconnects.

Please don't add null checks to the HandleIncomingMessagesAsync function as the StopAsync function now properly waits for the listener to exit, doing so would make actual bugs harder to catch.
The function also must not check the _clientWebSocket's state as the listener should be stopped once the connection is dropped. A properly implemented code would call the StartAsync in the Error handler if the aim is to reconnect on Errors.

made StopAsync and StartAsync thread-safe. Implemented the fix for Nethereum#738. Fixed the problem where the _listener task completed immediately. Also, the listener now quits when the WebSocket disconnects.
@juanfranblanco
Copy link
Member

As per other threads.. (and new people coming over) there was other pull request working on this, which include similar changes, but keeping this open just in case it helps anyone else.

@snake-4
Copy link
Contributor Author

snake-4 commented Jul 1, 2023

As per other threads.. (and new people coming over) there was other pull request working on this, which include similar changes, but keeping this open just in case it helps anyone else.

I've checked the up-to-date code as of now, and all of the problems that I've mentioned still exist.

  1. StopAsync and StartAsync are not thread-safe, the _startStopSemaphore in my commit was for that purpose.
  2. Race condition still exists, that means; _listener task may start, error out and exit before the connection could be made.
  3. Task.Factory.StartNew call's return value is still not unwrapped.

There was also something to do with not being able to call StartAsync in the user-defined error handler; I made the HandleIncomingMessagesAsync not wait for HandleError call to finish, to counter that.
To be honest, this PR was quite a long time ago and I don't remember if there were other things. It'd be nice if you could check the changes again.

@juanfranblanco
Copy link
Member

Thank you!! I lost track of the improvements (or not tested correctly most probably!)

@cysnet
Copy link

cysnet commented Dec 20, 2023

@juanfranblanco I think it exactly one issue will cause the HIGH CPU USEAGE. I create a dump and below is the callstask.


OS Thread Id: 0x16d82f (18)
        Child SP               IP Call Site
00007F40A13F11A0 00007F4327811D09 System.Threading.SemaphoreSlim.WaitAsync(Int32, System.Threading.CancellationToken) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/SemaphoreSlim.cs @ 610]
00007F40A13F11F0 00007F432665D45B Nethereum.JsonRpc.WebSocketStreamingClient.StreamingWebSocketClient+<HandleIncomingMessagesAsync>d__48.MoveNext()
00007F40A13F12C0 00007F4327813D8E System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Threading.Tasks.VoidTaskResult, System.Private.CoreLib],[Nethereum.JsonRpc.WebSocketStreamingClient.StreamingWebSocketClient+<HandleIncomingMessagesAsync>d__48, Nethereum.JsonRpc.WebSocketClient]].ExecutionContextCallback(System.Object) [/_/src/libraries/System.Private.CoreLib/src/System/Runtime/CompilerServices/AsyncTaskMethodBuilderT.cs @ 286]
00007F40A13F12E0 00007F432782AC99 System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/ExecutionContext.cs @ 183]
00007F40A13F1330 00007F4327813829 System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Threading.Tasks.VoidTaskResult, System.Private.CoreLib],[Nethereum.JsonRpc.WebSocketStreamingClient.StreamingWebSocketClient+<HandleIncomingMessagesAsync>d__48, Nethereum.JsonRpc.WebSocketClient]].MoveNext(System.Threading.Thread) [/_/src/libraries/System.Private.CoreLib/src/System/Runtime/CompilerServices/AsyncTaskMethodBuilderT.cs @ 324]
00007F40A13F1380 00007F4327813729 System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Threading.Tasks.VoidTaskResult, System.Private.CoreLib],[Nethereum.JsonRpc.WebSocketStreamingClient.StreamingWebSocketClient+<HandleIncomingMessagesAsync>d__48, Nethereum.JsonRpc.WebSocketClient]].MoveNext() [/_/src/libraries/System.Private.CoreLib/src/System/Runtime/CompilerServices/AsyncTaskMethodBuilderT.cs @ 302]
00007F40A13F13A0 00007F432782E3F1 System.Threading.Tasks.AwaitTaskContinuation.RunOrScheduleAction(System.Runtime.CompilerServices.IAsyncStateMachineBox, Boolean) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/TaskContinuation.cs @ 795]
00007F40A13F13E0 00007F432782DDDB System.Threading.Tasks.Task.RunContinuations(System.Object) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs @ 3374]
00007F40A13F1460 00007F432783F4A5 System.Threading.Tasks.Task`1[[System.__Canon, System.Private.CoreLib]].TrySetResult(System.__Canon) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Future.cs @ 400]
00007F40A13F1490 00007F4326C2C64F Nethereum.JsonRpc.WebSocketStreamingClient.StreamingWebSocketClient+<ReceiveFullResponseAsync>d__46.MoveNext()
00007F40A13F1510 00007F43278135C0 System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.__Canon, System.Private.CoreLib],[Nethereum.JsonRpc.WebSocketStreamingClient.StreamingWebSocketClient+<ReceiveFullResponseAsync>d__46, Nethereum.JsonRpc.WebSocketClient]].ExecutionContextCallback(System.Object) [/_/src/libraries/System.Private.CoreLib/src/System/Runtime/CompilerServices/AsyncTaskMethodBuilderT.cs @ 286]
00007F40A13F1550 00007F432782AC99 System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/ExecutionContext.cs @ 183]
00007F40A13F15A0 00007F43278133B0 System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.__Canon, System.Private.CoreLib],[Nethereum.JsonRpc.WebSocketStreamingClient.StreamingWebSocketClient+<ReceiveFullResponseAsync>d__46, Nethereum.JsonRpc.WebSocketClient]].MoveNext(System.Threading.Thread) [/_/src/libraries/System.Private.CoreLib/src/System/Runtime/CompilerServices/AsyncTaskMethodBuilderT.cs @ 324]
00007F40A13F1620 00007F4327813269 System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.__Canon, System.Private.CoreLib],[Nethereum.JsonRpc.WebSocketStreamingClient.StreamingWebSocketClient+<ReceiveFullResponseAsync>d__46, Nethereum.JsonRpc.WebSocketClient]].MoveNext() [/_/src/libraries/System.Private.CoreLib/src/System/Runtime/CompilerServices/AsyncTaskMethodBuilderT.cs @ 302]
00007F40A13F1640 00007F432782E3F1 System.Threading.Tasks.AwaitTaskContinuation.RunOrScheduleAction(System.Runtime.CompilerServices.IAsyncStateMachineBox, Boolean) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/TaskContinuation.cs @ 795]
00007F40A13F1680 00007F432782DDDB System.Threading.Tasks.Task.RunContinuations(System.Object) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs @ 3374]
00007F40A13F1700 00007F432783F4A5 System.Threading.Tasks.Task`1[[System.__Canon, System.Private.CoreLib]].TrySetResult(System.__Canon) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Future.cs @ 400]
00007F40A13F1730 00007F432785B039 Nethereum.JsonRpc.WebSocketStreamingClient.StreamingWebSocketClient+<ReceiveBufferedResponseAsync>d__44.MoveNext()
00007F40A13F17C0 00007F43278131E0 System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.__Canon, System.Private.CoreLib],[Nethereum.JsonRpc.WebSocketStreamingClient.StreamingWebSocketClient+<ReceiveBufferedResponseAsync>d__44, Nethereum.JsonRpc.WebSocketClient]].ExecutionContextCallback(System.Object) [/_/src/libraries/System.Private.CoreLib/src/System/Runtime/CompilerServices/AsyncTaskMethodBuilderT.cs @ 286]
00007F40A13F1800 00007F432782AC99 System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/ExecutionContext.cs @ 183]
00007F40A13F1850 00007F4327812FD0 System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.__Canon, System.Private.CoreLib],[Nethereum.JsonRpc.WebSocketStreamingClient.StreamingWebSocketClient+<ReceiveBufferedResponseAsync>d__44, Nethereum.JsonRpc.WebSocketClient]].MoveNext(System.Threading.Thread) [/_/src/libraries/System.Private.CoreLib/src/System/Runtime/CompilerServices/AsyncTaskMethodBuilderT.cs @ 324]
00007F40A13F18D0 00007F4327812E89 System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.__Canon, System.Private.CoreLib],[Nethereum.JsonRpc.WebSocketStreamingClient.StreamingWebSocketClient+<ReceiveBufferedResponseAsync>d__44, Nethereum.JsonRpc.WebSocketClient]].MoveNext() [/_/src/libraries/System.Private.CoreLib/src/System/Runtime/CompilerServices/AsyncTaskMethodBuilderT.cs @ 302]
00007F40A13F18F0 00007F432782E3F1 System.Threading.Tasks.AwaitTaskContinuation.RunOrScheduleAction(System.Runtime.CompilerServices.IAsyncStateMachineBox, Boolean) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/TaskContinuation.cs @ 795]
00007F40A13F1930 00007F432782DDDB System.Threading.Tasks.Task.RunContinuations(System.Object) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs @ 3374]
00007F40A13F19B0 00007F432783F4A5 System.Threading.Tasks.Task`1[[System.__Canon, System.Private.CoreLib]].TrySetResult(System.__Canon) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Future.cs @ 400]
00007F40A13F19E0 00007F4322211519 System.Threading.Tasks.ValueTask`1+ValueTaskSourceAsTask+<>c[[System.__Canon, System.Private.CoreLib]].<.cctor>b__4_0(System.Object) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/ValueTask.cs @ 638]
00007F40A13F1A30 00007F4322211AA5 System.Threading.Tasks.Sources.ManualResetValueTaskSourceCore`1[[System.__Canon, System.Private.CoreLib]].SignalCompletion() [/_/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs @ 233]
00007F40A13F1A60 00007F4322211630 System.Threading.Tasks.Sources.ManualResetValueTaskSourceCore`1[[System.__Canon, System.Private.CoreLib]].SetResult(System.__Canon) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs @ 62]
00007F40A13F1A80 00007F43222131D0 System.Runtime.CompilerServices.PoolingAsyncValueTaskMethodBuilder`1+StateMachineBox[[System.__Canon, System.Private.CoreLib]].SetResult(System.__Canon) [/_/src/libraries/System.Private.CoreLib/src/System/Runtime/CompilerServices/PoolingAsyncValueTaskMethodBuilderT.cs @ 250]
00007F40A13F1AB0 00007F4322212F62 System.Runtime.CompilerServices.PoolingAsyncValueTaskMethodBuilder`1[[System.__Canon, System.Private.CoreLib]].SetResult(System.__Canon) [/_/src/libraries/System.Private.CoreLib/src/System/Runtime/CompilerServices/PoolingAsyncValueTaskMethodBuilderT.cs @ 60]
00007F40A13F1AD0 00007F432785D127 System.Net.WebSockets.ManagedWebSocket+<ReceiveAsyncPrivate>d__63`1[[System.__Canon, System.Private.CoreLib]].MoveNext()
00007F40A13F1DA0 00007F432782AC99 System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/ExecutionContext.cs @ 183]
00007F40A13F1DF0 00007F432785D81D System.Net.WebSockets.ManagedWebSocket+<EnsureBufferContainsAsync>d__74.MoveNext() [/_/src/libraries/System.Net.WebSockets/src/System/Net/WebSockets/ManagedWebSocket.cs @ 1374]
00007F40A13F1E90 00007F432782AC99 System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/ExecutionContext.cs @ 183]
00007F40A13F1EE0 00007F432782E3F1 System.Threading.Tasks.AwaitTaskContinuation.RunOrScheduleAction(System.Runtime.CompilerServices.IAsyncStateMachineBox, Boolean) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/TaskContinuation.cs @ 795]
00007F40A13F1F20 00007F432782DDDB System.Threading.Tasks.Task.RunContinuations(System.Object) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs @ 3374]
00007F40A13F1FA0 00007F4327842512 System.Threading.Tasks.Task`1[[System.Int32, System.Private.CoreLib]].TrySetResult(Int32) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Future.cs @ 400]
00007F40A13F1FE0 00007F432785DE1E System.Net.Http.HttpConnection+RawConnectionStream+<ReadAsync>d__6.MoveNext() [/_/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/RawConnectionStream.cs @ 88]
00007F40A13F20B0 00007F432782AC99 System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/ExecutionContext.cs @ 183]
00007F40A13F2100 00007F432787155A System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Int32, System.Private.CoreLib],[System.Net.Http.HttpConnection+RawConnectionStream+<ReadAsync>d__6, System.Net.Http]].MoveNext(System.Threading.Thread) [/_/src/libraries/System.Private.CoreLib/src/System/Runtime/CompilerServices/AsyncTaskMethodBuilderT.cs @ 324]
00007F40A13F2150 00007F432782E3F1 System.Threading.Tasks.AwaitTaskContinuation.RunOrScheduleAction(System.Runtime.CompilerServices.IAsyncStateMachineBox, Boolean) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/TaskContinuation.cs @ 795]
00007F40A13F2190 00007F432782DDDB System.Threading.Tasks.Task.RunContinuations(System.Object) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs @ 3374]
00007F40A13F2210 00007F43278423D4 System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1[[System.Int32, System.Private.CoreLib]].SetExistingTaskResult(System.Threading.Tasks.Task`1<Int32>, Int32) [/_/src/libraries/System.Private.CoreLib/src/System/Runtime/CompilerServices/AsyncTaskMethodBuilderT.cs @ 446]
00007F40A13F2250 00007F432785E750 System.Net.Http.HttpConnection+<ReadBufferedAsyncCore>d__108.MoveNext() [/_/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnection.cs @ 1859]
00007F40A13F2350 00007F432782AC99 System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/ExecutionContext.cs @ 183]
00007F40A13F23A0 00007F432787134A System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Int32, System.Private.CoreLib],[System.Net.Http.HttpConnection+<ReadBufferedAsyncCore>d__108, System.Net.Http]].MoveNext(System.Threading.Thread) [/_/src/libraries/System.Private.CoreLib/src/System/Runtime/CompilerServices/AsyncTaskMethodBuilderT.cs @ 324]
00007F40A13F23F0 00007F432782E3F1 System.Threading.Tasks.AwaitTaskContinuation.RunOrScheduleAction(System.Runtime.CompilerServices.IAsyncStateMachineBox, Boolean) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/TaskContinuation.cs @ 795]
00007F40A13F2430 00007F432782DDDB System.Threading.Tasks.Task.RunContinuations(System.Object) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs @ 3374]
00007F40A13F24B0 00007F43278423D4 System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1[[System.Int32, System.Private.CoreLib]].SetExistingTaskResult(System.Threading.Tasks.Task`1<Int32>, Int32) [/_/src/libraries/System.Private.CoreLib/src/System/Runtime/CompilerServices/AsyncTaskMethodBuilderT.cs @ 446]
00007F40A13F24F0 00007F4327853977 System.Net.Security.SslStream+<ReadAsyncInternal>d__188`1[[System.Net.Security.AsyncReadWriteAdapter, System.Net.Security]].MoveNext() [/_/src/libraries/System.Net.Security/src/System/Net/Security/SslStream.Implementation.cs @ 1066]
00007F40A13F2670 00007F432782AC99 System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/ExecutionContext.cs @ 183]
00007F40A13F26C0 00007F43278640EA System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Int32, System.Private.CoreLib],[System.Net.Security.SslStream+<ReadAsyncInternal>d__188`1[[System.Net.Security.AsyncReadWriteAdapter, System.Net.Security]], System.Net.Security]].MoveNext(System.Threading.Thread) [/_/src/libraries/System.Private.CoreLib/src/System/Runtime/CompilerServices/AsyncTaskMethodBuilderT.cs @ 324]
00007F40A13F2710 00007F432782E3F1 System.Threading.Tasks.AwaitTaskContinuation.RunOrScheduleAction(System.Runtime.CompilerServices.IAsyncStateMachineBox, Boolean) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/TaskContinuation.cs @ 795]
00007F40A13F2750 00007F432782DDDB System.Threading.Tasks.Task.RunContinuations(System.Object) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs @ 3374]
00007F40A13F27D0 00007F43278423D4 System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1[[System.Int32, System.Private.CoreLib]].SetExistingTaskResult(System.Threading.Tasks.Task`1<Int32>, Int32) [/_/src/libraries/System.Private.CoreLib/src/System/Runtime/CompilerServices/AsyncTaskMethodBuilderT.cs @ 446]
00007F40A13F2810 00007F432784B7A5 System.Net.Security.SslStream+<EnsureFullTlsFrameAsync>d__186`1[[System.Net.Security.AsyncReadWriteAdapter, System.Net.Security]].MoveNext() [/_/src/libraries/System.Net.Security/src/System/Net/Security/SslStream.Implementation.cs @ 865]
00007F40A13F28B0 00007F432782AC99 System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object) [/_/src/libraries/System.Private.CoreLib/src/System/Threading/ExecutionContext.cs @ 183]
00007F40A13F2900 00007F4327863EDA System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Int32, System.Private.CoreLib],[System.Net.Security.SslStream+<EnsureFullTlsFrameAsync>d__186`1[[System.Net.Security.AsyncReadWriteAdapter, System.Net.Security]], System.Net.Security]].MoveNext(System.Threading.Thread) [/_/src/libraries/System.Private.CoreLib/src/System/Runtime/CompilerServices/AsyncTaskMethodBuilderT.cs @ 324]
00007F40A13F2950 00007F4327858B1D System.Net.Sockets.Socket+AwaitableSocketAsyncEventArgs.InvokeContinuation(System.Action`1<System.Object>, System.Object, Boolean, Boolean) [/_/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Tasks.cs @ 1242]
00007F40A13F2990 00007F43278589B4 System.Net.Sockets.Socket+AwaitableSocketAsyncEventArgs.OnCompleted(System.Net.Sockets.SocketAsyncEventArgs) [/_/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Tasks.cs @ 912]
00007F40A13F29E0 00007F432784FD41 System.Net.Sockets.SocketAsyncEngine.System.Threading.IThreadPoolWorkItem.Execute()
00007F40A13F2A60 00007F4327848571 System.Threading.ThreadPoolWorkQueue.Dispatch()
00007F40A13F2AE0 00007F4327889624 System.Threading.PortableThreadPool+WorkerThread.WorkerThreadStart() [/_/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs @ 63]
00007F40A13F2CF0 00007f439bd0df77 [DebuggerU2MCatchHandlerFrame: 00007f40a13f2cf0]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants