Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate streaming client #1066

Merged
merged 2 commits into from Nov 9, 2018

Conversation

@phadej
Copy link
Member

commented Nov 1, 2018

Follow up to #991

  • write changelog

@phadej phadej force-pushed the phadej:separate-streaming-client branch from c0a9935 to c6fcc2d Nov 1, 2018

@alpmestan

This comment has been minimized.

Copy link
Contributor

commented Nov 1, 2018

@lucasdicioccio See the last commit here, this is quite relevant to your http2 client. We're leaning towards putting the streaming client "runner" in a separate class, where the streaming stuffs is exposed under the withXXX pattern, CPS-style.

@phadej

This comment has been minimized.

Copy link
Member Author

commented Nov 1, 2018

Looking at servant-http2-client code, the current performStreamingRequest function already uses withHttp2Stream, so adaptation should be straight-forward.

@phadej

This comment has been minimized.

Copy link
Member Author

commented Nov 1, 2018

From cd18ac60233a93f76908d121c7ffbb36e35d9489 Mon Sep 17 00:00:00 2001
From: Oleg Grenrus <oleg.grenrus@iki.fi>
Date: Thu, 1 Nov 2018 20:20:18 +0200
Subject: [PATCH] Adapt to stream changes

---
 src/Network/HTTP2/Client/Servant.hs | 27 ++++++++++++++-------------
 test/Spec.hs                        | 16 ++++++----------
 2 files changed, 20 insertions(+), 23 deletions(-)

diff --git a/src/Network/HTTP2/Client/Servant.hs b/src/Network/HTTP2/Client/Servant.hs
index f3d198c..4dd8932 100644
--- a/src/Network/HTTP2/Client/Servant.hs
+++ b/src/Network/HTTP2/Client/Servant.hs
@@ -56,11 +56,13 @@ runH2ClientM cm env = runExceptT $ flip runReaderT env $ unH2ClientM cm
 instance RunClient H2ClientM where
   runRequest :: Request -> H2ClientM Response
   runRequest = performRequest
-  streamingRequest :: Request -> H2ClientM StreamingResponse
-  streamingRequest = performStreamingRequest
   throwServantError :: ServantError -> H2ClientM a
   throwServantError = throwError
 
+instance RunStreamingClient H2ClientM where
+  withStreamingRequest :: Request -> (StreamingResponse -> IO r) -> H2ClientM r
+  withStreamingRequest = performStreamingRequest
+
 data H2ClientEnv = H2ClientEnv ByteString Http2Client
 
 type ByteSegments = (IO ByteString -> IO ()) -> IO ()
@@ -208,32 +210,31 @@ replenishFlowControls icfc isfc len = do
     pure ()
 
 -- | Implementation of requests with streaming replies.
-performStreamingRequest :: Request -> H2ClientM StreamingResponse
-performStreamingRequest req = do
+performStreamingRequest :: Request -> (StreamingResponse -> IO r) -> H2ClientM r
+performStreamingRequest req kont = do
     H2ClientEnv authority http2client <- ask
     let icfc = _incomingFlowControl http2client
     let ocfc = _outgoingFlowControl http2client
     (headersPairs, mBodyIO) <- liftIO $ makeRequest authority req
     let headersFlags = if isJust mBodyIO then id else setEndStream
     ret <- liftIO $ withHttp2Stream http2client $ \stream ->
-        let initStream =
-                headers stream headersPairs headersFlags
+        let initStream = headers stream headersPairs headersFlags
             handler isfc osfc = do
                 -- Send the request
                 flip traverse_ mBodyIO (\bodyIO -> do
                     sendNonEmptySegments http2client stream ocfc osfc bodyIO
                     sendData http2client stream setEndStream "")
+
                 -- Waits for headers and returns the response object to the
                 -- caller.
-                pure $ StreamingResponse (\handleGenResponse -> do
-                    ev <- _waitEvent stream
-                    case ev of
-                        (StreamHeadersEvent fh hdrs) -> handleHeaders stream icfc isfc fh hdrs handleGenResponse
-                        _ -> throwIO $ Servant.Client.Core.ConnectionError $ "unwanted event received in data stream" <> Text.pack (show ev)
-                        )
+                ev <- _waitEvent stream
+                case ev of
+                    (StreamHeadersEvent fh hdrs) -> handleHeaders stream icfc isfc fh hdrs kont
+                    _ -> throwIO $ Servant.Client.Core.ConnectionError $ "unwanted event received in data stream" <> Text.pack (show ev)
+
         in (StreamDefinition initStream handler)
     case ret of
-        Right streamingResp -> pure streamingResp
+        Right r -> return r
         Left (TooMuchConcurrency _) ->
             throwError $ Servant.Client.Core.ConnectionError "too many concurrent streams"
   where
diff --git a/test/Spec.hs b/test/Spec.hs
index fd7419c..01f52db 100644
--- a/test/Spec.hs
+++ b/test/Spec.hs
@@ -19,6 +19,7 @@ import           Network.HTTP2.Client.Servant
 import qualified Network.TLS as TLS
 import qualified Network.TLS.Extra.Cipher as TLS
 import           Servant.API
+import           Servant.Types.SourceT (foreach)
 import           Servant.Client.Core
 
 data RawText
@@ -33,14 +34,14 @@ instance MimeUnrender OctetStream Text where
 
 type Http2Golang =
        "reqinfo" :> Get '[RawText] Text
-  :<|> "goroutines" :> StreamGet NewlineFraming RawText (ResultStream Text)
+  :<|> "goroutines" :> StreamGet NewlineFraming RawText (SourceIO Text)
   :<|> "ECHO" :> ReqBody '[RawText] Text :> Put '[OctetStream] Text
 
 myApi :: Proxy Http2Golang
 myApi = Proxy
 
 getExample :: H2ClientM Text
-getGoroutines :: H2ClientM (ResultStream Text)
+getGoroutines :: H2ClientM (SourceIO Text)
 capitalize :: Text -> H2ClientM Text
 getExample :<|> getGoroutines :<|> capitalize = h2client myApi
 
@@ -55,15 +56,10 @@ main = do
     runH2ClientM getGoroutines env >>= go
     _close client
   where
-    go :: Either ServantError (ResultStream Text) -> IO ()
+    go :: Either ServantError (SourceIO Text) -> IO ()
     go (Left err) = print err
-    go (Right (ResultStream k)) = k $ \getResult ->
-       let loop = do
-            r <- getResult
-            case r of
-                Nothing -> return ()
-                Just x -> print x >> loop
-       in loop
+    go (Right sourceIO) = foreach fail print sourceIO
+
     tlsParams = TLS.ClientParams {
           TLS.clientWantSessionResume    = Nothing
         , TLS.clientUseMaxFragmentLength = Nothing
-- 
2.17.1
@lucasdicioccio

This comment has been minimized.

Copy link

commented Nov 1, 2018

Thanks for the heads-up Alp. And thanks @phadej for the patch.

The withHttp2Stream actually doesn't steal the control (a first design did that but now it only steals the thread control for a short while to enforce an HTTP2 invariant on new-stream-IDs). Further, this function doesn't automatically close the stream and a power user can escape everything they want from the 2nd method. Hence, CSP-style should work. I'll need a few more iterations to find a better design (I was thinking adding a module with type-indexed state-machines).

I'll have more time during next week.

@phadej phadej force-pushed the phadej:separate-streaming-client branch from c6fcc2d to 999fc50 Nov 1, 2018

@phadej

This comment has been minimized.

Copy link
Member Author

commented Nov 1, 2018

@lucasdicioccio I realised that (non stealing control). The with name is confusing. Anyway, my patch compiles and tests "pass"; but I have no idea if things works as they should. I don't know HTTP2 at all.

@phadej

This comment has been minimized.

Copy link
Member Author

commented Nov 1, 2018

@lucasdicioccio you should look at this branch StreamingBody. Maybe you can use it somehow to make duplex communication. That would be very cool!

@phadej

This comment has been minimized.

Copy link
Member Author

commented Nov 1, 2018

I also run benchmarks (streaming-benchmark.sh) and cannot spot the difference between this and before. So I'm heavily inclined to merge this for 0.15

@domenkozar

This comment has been minimized.

Copy link
Contributor

commented Nov 4, 2018

Really good work (as always) :)

I have a few questions, although b nor c should block the merge.

a) How does one mix&match Servant.Client.Generic with streaming, if streaming endpoints require a different ClientM?

b) Does the streaming client handle HTTP redirects?

c) Putting the monad into the API sometimes has devastating results, for example api package needs to depend on amazonka for AWST. Does the monad that server operate on really depend in the api, or maybe it should've been a type family in the api and attached via Context for the servant-server?

PS: I'm going to test this with cachix (client side) and report back.

@phadej

This comment has been minimized.

Copy link
Member Author

commented Nov 5, 2018

@domenkozar

These is something we'd need to add to changelog / tutorial:

a) Servant.Client.Streaming works with non-streaming endpoints too. You need to work with all endpoints using withClientM though.

b) It works as underlying http-clients withResponse works. So probably yes.

c) I don't understand the question.

@phadej phadej referenced this pull request Nov 5, 2018
12 of 12 tasks complete

@phadej phadej force-pushed the phadej:separate-streaming-client branch from 999fc50 to 5dd198e Nov 5, 2018

@phadej phadej added this to the 0.15 milestone Nov 5, 2018

@domenkozar

This comment has been minimized.

Copy link
Contributor

commented Nov 5, 2018

@phadej for (c) see plow-technologies/servant-streaming#11

In short, the monad in which servant-server operates for streaming doesn't really belong in the API description as it's the implementation detail of the server (similarly for the client).

@phadej

This comment has been minimized.

Copy link
Member Author

commented Nov 5, 2018

there are no StreamBodyMonad....

We have StreamingBody now: See https://haskell-servant.readthedocs.io/en/latest/cookbook/basic-streaming/Streaming.html (I don't understand how this is related to servant-client refactor?)

@phadej phadej force-pushed the phadej:separate-streaming-client branch 3 times, most recently from cccd7b9 to ac5fa65 Nov 5, 2018

@phadej

This comment has been minimized.

Copy link
Member Author

commented Nov 8, 2018

@alpmestan could you re-review this. I rebased the branch, and there are few small changes because of recent changes in master (I think in good direction).

Separate Servant.Client.Streaming
- as a bonus only `servant-client` depends on `kan-extensions`

@phadej phadej force-pushed the phadej:separate-streaming-client branch from ac5fa65 to 8feda81 Nov 8, 2018

:<|> "capture-all" :> CaptureAll "foo" Int :> GET
:<|> "summary" :> Summary "foo" :> GET
:<|> "description" :> Description "foo" :> GET
:<|> "alternative" :> ("left" :> GET :<|> "right" :> GET)

This comment has been minimized.

Copy link
@phadej

phadej Nov 8, 2018

Author Member

this case is added to make a (small) tree, as otherwise the structure of ComprehensiveAPI is linear.

@domenkozar

This comment has been minimized.

Copy link
Contributor

commented Nov 8, 2018

I'd like to bring up another UX issue, but maybe I don't have the full picture.

Suppose one is developing a big servant application with many routes. Once streaming is added, it changes client calls from runClientM to withClientM - one has to rewrite all client calls. Previously mixing the two was possible. I think this makes sense if eventually we'd rather get rid of runClientM (and rewrite the documentation).

@domenkozar

This comment has been minimized.

Copy link
Contributor

commented Nov 9, 2018

I'd like to rephrase as my message was too implicit.

Currently default client has "runClientM" and streaming client has "withClientM". We have to think about the story of migration, which is to pick what we recommend to the users of servant to do:

a) if you want to use streaming use withClientM, otherwise runClientM, switch back and forth depending what you need

b) withClientM is the new CPS way, if you happen to use streaming you'll want to migrate right away otherwise take time but runClientM will eventually be the deprecated old way

The main question is, do we really still need runClientM and should we add withClientM also the default client?

@phadej phadej merged commit 97bd6f0 into haskell-servant:master Nov 9, 2018

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details

@phadej phadej deleted the phadej:separate-streaming-client branch Nov 9, 2018

@phadej

This comment has been minimized.

Copy link
Member Author

commented Nov 9, 2018

  • runClientM is way simpler for tasks where it's enough (i.e. no Stream), so I'm 👎 for removing it.
  • one can write withClientM for Servant.Client but I don't want encourage use of it. If one really needs it , it's easy to write inline.

You can use both.

See and comment in #1072

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.