Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streaming asynchronous exception #104

Open
tim2CF opened this issue May 15, 2020 · 20 comments
Open

streaming asynchronous exception #104

tim2CF opened this issue May 15, 2020 · 20 comments

Comments

@tim2CF
Copy link

tim2CF commented May 15, 2020

Hi! I'm using async package to manage my threads. In many places race function is very handy. For example:

res <- race receiveLongServerStream executeSomethingShort

where receiveLongServerStream is client action which receives some events from gRPC server, any amount of events, basically it's infinite thread

and executeSomethingShort is something short-living process which should normally return value first, then race function should cancel receiveLongServerStream and return res which is supposed to be Right there, because executeSomethingShort returned value first.

But for some reason all expression is blocked, probably because race can't cancel receiveLongServerStream for some reason. Here is docs for cancel function if it can help

https://hackage.haskell.org/package/async-2.2.2/docs/Control-Concurrent-Async.html#v:cancel

I just want to know how I can deal with it. Because this behaviour is causing deadlocks.

@Gabriella439
Copy link
Contributor

@tim2CF: My first instinct when I see an unkillable thread is that something is blocked on an unsafe FFI call

@tim2CF
Copy link
Author

tim2CF commented May 15, 2020

Probably gRPC C code is causing this right? receiveLongServerStream in my example is client-side listener, basically subscription for server events with some attached IO handler, and it has very big timeout, for example 1 hour, because it don't make sense to reopen connection for events which I want to always listen and handle asap:

https://github.com/coingaming/lnd-client/blob/fe0f9560ee0f85107f8520cc8088eea3aceac4fc/src/LndClient/RPC.hs#L362-L366

In some cases I can replace race with some combination of link =<< (async this)
But process which is not cancellable by race was such a surprise for me, spent couple of hours to find it)

@Gabriella439
Copy link
Contributor

Yeah, I suspect an FFI call in the binding to the grpc package is responsible for this. It might either be in the logic for the server itself or in some finalizer which is triggered when the thread is cancelled and then the finalizer is not cancellable.

The second most common reason for this is finalization logic that inappropriately masks exceptions, leading to Haskell code that blocks and is uninterruptible.

@tim2CF
Copy link
Author

tim2CF commented Jun 9, 2020

Thanks @Gabriel439 !
A bit unrelated question.. is there any good way to handle nicely subscription misbehaviour in case handler receives unexpected values? Here is an example

https://github.com/coingaming/lnd-client/blob/6760cbf23ad351b982fefad5585199eca818943b/src/LndClient/RPC.hs#L207-L212

There my gRPC stream handler throws exception, just because I have no idea how to terminate subscription from handler when handler receives some corrupted or unexpected data. If there is nice way without exceptions, I would like to use it

@Gabriella439
Copy link
Contributor

@tim2CF: There is a chance that I might have fixed the issue you are running into in the course of fixing #105

@tim2CF
Copy link
Author

tim2CF commented Jun 10, 2020

Thanks! Any thoughts about proper termination of gRPC subscription from inside of the handler?
From type signature I can't get any useful thoughts, because handler function returns IO () value

ClientReaderRequest :: request -> TimeoutSeconds -> MetadataMap -> (LL.ClientCall -> MetadataMap -> StreamRecv response -> IO ()) -> ClientRequest 'ServerStreaming request response

So I have no idea how I do terminate stream from the handler if it gets some corrupted data

@Gabriella439
Copy link
Contributor

@tim2CF: I'm not sure, but have you tried manually terminating by not executing any more IO actions within the handler? In other words, in the event of some sort of corruption you stop reading data?

If you need something a bit more high-level you could do something like this:

https://hackage.haskell.org/package/break-1.0.2/docs/Control-Break.html

@tim2CF
Copy link
Author

tim2CF commented Jun 10, 2020

If I'm not calling recursively stream handler - subscription will just terminate?
The bad thing about IO () type is that there is no clear mechanism how to propagate the reason of termination outside. For now I'm using Control.Exception for this - throw exception from inside of subscription handler

https://github.com/coingaming/lnd-client/blob/b5e29ce9218b65b692afe706705964f739206c98/src/LndClient/RPC.hs#L441-L442

And catching if from outside in level of subscription

https://github.com/coingaming/lnd-client/blob/b5e29ce9218b65b692afe706705964f739206c98/src/LndClient/RPC.hs#L349

It works, so seems like not an issue at all, I just was confused a bit because I'm not using exceptions in Haskell in everyday basis. But thanks for reply!

@Gabriella439
Copy link
Contributor

@tim2CF: My understanding is that subscription will terminate when the callback you supply to ClientReaderRequest terminates

That said, I think you should be able to safely throw an exception inside of the handler. If that causes issues then we can always look into that

@tim2CF
Copy link
Author

tim2CF commented Jun 11, 2020

Thanks!

@Gabriella439
Copy link
Contributor

@tim2CF: You're welcome! 🙂

@tim2CF
Copy link
Author

tim2CF commented Jan 28, 2021

Hi again, @Gabriel439! Is it possible that gRPC subscription client is still non-cancellable from the client side with cancel function? I recently did some improvements on my library, trying to use withAsync + link instead of async + link for more clean runtime, but found that this test is hanging exactly in place where Async is trying to be cancelled. Seems like using simple cancel like here https://github.com/coingaming/lnd-client/blob/a1cae565f843e6ebbcd7734129463a72370bc81f/test/LndClient/TestApp.hs#L196-L209 or better withAsync version like here https://github.com/coingaming/lnd-client/blob/a1cae565f843e6ebbcd7734129463a72370bc81f/test/LndClient/TestApp.hs#L210-L224 makes no difference, this line is never reached https://github.com/coingaming/lnd-client/blob/a1cae565f843e6ebbcd7734129463a72370bc81f/test/LndClient/TestApp.hs#L210-L224

As I remember, you was talking about cancellation from the server side, but here is example how I'm trying to cancel client async subscription from client side as well. I'm using latest versions or your gRPC package with previous fix already included, but for some reason it's still not working.

@tim2CF
Copy link
Author

tim2CF commented Jan 28, 2021

In my examples I'm using my own spawnLink and withSpawnLink, but they are just some generic wrappers around async + link and withAsync + link https://github.com/coingaming/lnd-client/blob/a1cae565f843e6ebbcd7734129463a72370bc81f/src/LndClient/Util.hs#L66-L81

I don't think this difference can cause problems, I think it will work the same way with original function from Async package.

@tim2CF
Copy link
Author

tim2CF commented Jan 29, 2021

As additional info I can say that seems like process spawned like this is not cancellable. Can it be true?

-- spawning grpc subscription in new async thread
pid <- async $ withGRPCClient config $ \client -> do
  blablabla
-- this seems not working until async thread is exited
cancel pid

@tim2CF
Copy link
Author

tim2CF commented Jan 29, 2021

I did compiled grpc-haskell with debug flag and found that subscription thread is hanging after log

[ThreadId 35]: runOps: allocated op contexts: [OpRecvMessageContext  
0x000                                                                
07f9bc8000b70]                                                       
[ThreadId 35]: runOps: tag: Tag {unTag = 0x8000000000000012}         
[ThreadId 35]: startBatch: calling grpc_call_start_batch with        
pointers: C                                                          
all 0x00007f9bc40238a0 OpArray 0x00007f9bb0001300                    
[ThreadId 35]: startBatch: grpc_call_start_batch call returned.      
[ThreadId 35]: runOps: called start_batch. callError: Right ()       
[ThreadId 35]: pluck: called with tag=Tag {unTag =                   
0x8000000000000012},mw                                               
ait=Nothing                                                          
[ThreadId 35]: pluck: blocking on grpc_completion_queue_pluck for    
tag=Tag                                                              
 {unTag = 0x8000000000000012}                                        

and then it's hanging in cancel line of high-level code I described before, and after significant timeout (couple of minutes) it resumes with logs

^[N^[[ThreadId 35]: pluck finished: Event {eventCompletionType =     
OpComple                                                             
te, eventSuccess = True, eventTag = Tag {unTag = 0x8000000000000012}}
C wrapper: timespec_destroy: freeing ptr: 0x7f9bb0000e90             
[ThreadId 35]: runOps: pluck returned Right ()                       
[ThreadId 35]: runOps: got good op; starting.                        
[ThreadId 35]: resultFromOpContext: OpRecvMessageContext             
C wrapper: free_slice: freeing ptr: 0x7f9bb0000ed0                   
C wrapper: byte_buffer_reader_destroy: freeing ptr: 0x7f9bb0000e90   
[ThreadId 35]: resultFromOpContext: bb copied: "\n\ENQHELLO\SUB      

and then finally shutting down/cancelling (without delay)

[ThreadId 35]: debugCall: client call: 0x00007f9bc40238a0            
[ThreadId 35]: withClientCall(R): destroying.                        
[ThreadId 35]: Destroying client-side call object.                   
[ThreadId 35]: withClient: destroying.                               
[ThreadId 35]: destroyClient: calling grpc_channel_destroy()         
[ThreadId 35]: destroyClient: shutting down CQ.                      
[ThreadId 35]: drainLoop: before next() call                         
C wrapper: timespec_destroy: freeing ptr: 0x7f9bc40122b0             
[ThreadId 35]: drainLoop: next() call got Event {eventCompletionType 
= Qu                                                                 
eueShutdown, eventSuccess = False, eventTag = Tag {unTag =           
0xe573bccf5e72                                                       
7f00}}                                                               
[ThreadId 35]: Got CQ loop shutdown result of: Just ()               
[ThreadId 35]: withGRPC: shutting down                               
[2021-01-29 19:54:07][LndClient][Info][d7fe5ea939ca][PID             
29037][ThreadId                                                      
26][main:LndClient.TestApp test/LndClient/TestApp.hs:226:7]          
BOOOOOOOMMMMM                                                        
MMMMMMMMMMM                                                          

I hope this can help you to understand what's going on there, and why process is not getting cancelled immediately (I still thinking it can be fault on my side where I'm using gRPC library) @Gabriel439

@tim2CF
Copy link
Author

tim2CF commented Jan 30, 2021

A took a look to gRPC-haskell source code a bit, and according my issue with client long subscription cancellation and logs produced in debug mode, it might be because of this mask_ function application. I'm really noob in low level FFA questions, but this place might cause this

withOpArrayAndCtxts ops $ \(opArray, contexts) -> mask_ $ do

What do you think?

@Gabriella439
Copy link
Contributor

Gabriella439 commented Feb 1, 2021

@tim2CF: Yes, generally use of mask_ is an anti-pattern and likely a source of thread not gracefully handling exceptions. I will take a look at this soon

@tim2CF
Copy link
Author

tim2CF commented Feb 1, 2021

Cool, thanks Gabriel! Sorry for the spam. This library seems the only one complete gRPC library for Haskell and I had no idea how to fix the issue.

@cocreature
Copy link
Collaborator

fwiw, I added the mask_ in #89. Fully agree, that it’s not great but it does workaround an actual issue. I’d be very happy to see a better solution here.

@Gabriella439
Copy link
Contributor

@cocreature: Actually, it's not clear to me why the mask_ is present. I've read the associated comment a few times and I still don't understand the sequence of events that lead to the bug that the mask_ is intended to prevent. In particular, I don't understand how gRPC can write to the freed ByteBuffer if there is an exception thrown in the middle of startBatch

RichardWarfield pushed a commit to litxio/gRPC-haskell that referenced this issue Apr 25, 2023
This PR moves the non-Haskell specific code generator utilities from `DotProto.Generate` to `DotProto.Internal`. I've gone through the `DotProto.Generate` module and made stylistic improvements---the changes are largely cosmetic. This refactor also paves the way for a Purescript code generator feature that I'm hacking on.

Major changes: 

- Pass args struct to `compileDotProtoFile` runner
- Enforce that `Path` is nonempty using `List.NonEmpty`. This removes a case from `CompileError`
- Nice combinators for writing `importDecl_` statements (see `defaultImports`)
- Pass file name to parsec; report file name in parser errors
- Make `DotProtoServiceRPC` args a separate type
- Clarify codegen steps using `foldMapM` and `foldMapOfM`. This might also improve efficiency by causing separate traversal steps (e.g. map -> sequence -> mconcat) to be done in a single pass, depending on what ghc was doing with the original code...
- Improve readability and organisation of large codegen functions

The version has been updated to `0.3.0.1` to reflect minor changes in types and dependencies.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants