Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report streaming progress #540

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 73 additions & 30 deletions fission-cli/library/Fission/CLI/Handler/App/Publish.hs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,30 @@ import Fission.CLI.Remote
import Fission.CLI.Environment (MonadEnvironment)
import Fission.CLI.WebNative.Mutation.Auth.Store as UCAN






import Control.Monad.Except
import Network.HTTP.Client as HTTP
import qualified Network.HTTP.Client.TLS as TLS
import qualified RIO.ByteString as BS
import Servant.Client
import qualified Servant.Client.Streaming as Stream
import Servant.Types.SourceT









import Fission.BytesReceived.Types
import qualified RIO.Text as Text

-- | Sync the current working directory to the server over IPFS
publish ::
( MonadIO m
Expand Down Expand Up @@ -88,50 +112,69 @@ publish
= do
logDebug @Text "📱 App publish"
attempt (App.readFrom appPath) >>= \case
Left err -> do
CLI.Error.put err "App not set up. Please double check your path, or run `fission app register`"
raise err
-- Left err -> do
-- CLI.Error.put err "App not set up. Please double check your path, or run `fission app register`"
-- raise err

Right App.Env {buildDir} -> do
absBuildPath <- liftIO $ makeAbsolute buildDir
logDebug $ "📱 Starting single IPFS add locally of " <> displayShow absBuildPath
logUser @Text "🛫 App publish local preflight"

CLI.IPFS.Add.dir (UTF8.wrapIn "\"" absBuildPath) >>= \case
Left err -> do
CLI.Error.put' err
raise err
-- Left err -> do
-- CLI.Error.put' err
-- raise err

Right cid@(CID hash) -> do
logDebug $ "📱 Directory CID is " <> hash
_ <- IPFS.Daemon.runDaemon
proof <- getRootUserProof
req <- App.update appURL cid (Just updateData) <$> Client.attachAuth proof
-- req <- App.update appURL cid (Just updateData) <$> Client.attachAuth proof
req <- App.streamingUpdate appURL cid <$> Client.attachAuth proof

logUser @Text "✈️ Pushing to remote"
retryOnStatus [status502] 100 req >>= \case
Left err -> do
CLI.Error.put err "Server unable to sync data"
raise err

Right _ -> do
ipfsGateway <- getIpfsGateway

when open do
liftIO . void . openBrowser $ ipfsGateway <> "/" <> show appURL

when watching do
liftIO $ FS.withManager \watchMgr -> do
now <- getCurrentTime
(hashCache, timeCache) <- atomically do
hashCache <- newTVar hash
timeCache <- newTVar now
return (hashCache, timeCache)

void $ handleTreeChanges runner proof appURL updateData timeCache hashCache watchMgr absBuildPath
liftIO . forever $ threadDelay 1_000_000 -- Sleep main thread

success appURL
logUser @Text "✈️FUR EEL"


req `streamWith` \stream -> do
BS.putStr "Inside"
case stream of
Left err -> logUser $ "NOOOOOOO" <> show err
Right stream -> do
liftIO $ foreach
(\errStr -> BS.putStr . encodeUtf8 . Text.pack $ "BAD: " <> errStr)
(\good -> BS.putStr . encodeUtf8 . Text.pack $ "GOOD: " <> show good) stream
-- result :: Either String [BytesReceived] <- liftIO . runExceptT $ runSourceT stream
-- case result of
-- Left err -> logUser $ show err
-- Right list ->
-- logUser $ show list

-- return ()
-- retryOnStatus [status502] 100 req >>= \case
-- Left err -> do
-- CLI.Error.put err "Server unable to sync data"
-- raise err
--
-- Right _ -> do
-- ipfsGateway <- getIpfsGateway
--
-- when open do
-- liftIO . void . openBrowser $ ipfsGateway <> "/" <> show appURL
--
-- when watching do
-- liftIO $ FS.withManager \watchMgr -> do
-- now <- getCurrentTime
-- (hashCache, timeCache) <- atomically do
-- hashCache <- newTVar hash
-- timeCache <- newTVar now
-- return (hashCache, timeCache)
--
-- void $ handleTreeChanges runner proof appURL updateData timeCache hashCache watchMgr absBuildPath
-- liftIO . forever $ threadDelay 1_000_000 -- Sleep main thread
--
-- success appURL

handleTreeChanges ::
( MonadIO m
Expand Down
11 changes: 11 additions & 0 deletions fission-cli/library/Fission/CLI/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import qualified "wss-client" Network.WebSockets.Client as WS
import Servant.API hiding
(IsMember)
import Servant.Client
import qualified Servant.Client.Streaming as Streaming
import qualified Wuss as WSS

import Fission.Prelude hiding (mask,
Expand Down Expand Up @@ -184,6 +185,16 @@ instance

liftIO . runClientM req . mkClientEnv manager $ toBaseUrl remote

streamWith req handler = do
manager <- asks $ getField @"httpManager"
remote <- getRemote

let env = mkClientEnv manager $ toBaseUrl remote

control \runInBase ->
Streaming.withClientM req env \resp ->
runInBase $ handler resp

instance
( Contains errs errs
, Display (OpenUnion errs)
Expand Down
2 changes: 1 addition & 1 deletion fission-cli/package.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: fission-cli
version: '2.15.1.0'
version: '2.16.0.0'
category: CLI
author:
- Brooklyn Zelenka
Expand Down
38 changes: 38 additions & 0 deletions fission-core/library/Fission/BytesReceived/Types.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
-- FIXME move to web api or higher
module Fission.BytesReceived.Types (BytesReceived (..)) where

import Data.Swagger hiding (URL, url)

import Fission.Prelude

import qualified RIO.Text as Text

newtype BytesReceived = BytesReceived { byteCount :: Natural }
deriving (Show, Eq)

instance ToJSON BytesReceived where
toJSON BytesReceived {..} = object [ "byteCount" .= byteCount ]

instance FromJSON BytesReceived where
parseJSON val =
trace (Text.pack $ "---> " <> show val) (
val |> withArray "BytesReceived" \arr -> do
trace (Text.pack $ show arr) $
return $ BytesReceived (fromIntegral $ length arr)
)
-- withObject "BytesReceived" \obj -> do
-- byteCount <- obj .: "byteCount"
-- return BytesReceived {..}

instance ToSchema BytesReceived where
declareNamedSchema _ = do
bytesSchema <- declareSchemaRef $ Proxy @Natural

mempty
|> type_ ?~ SwaggerObject
|> properties .~ [("byteCount", bytesSchema)]
|> required .~ ["byteCount"]
|> description ?~ "Properties for a registered application"
|> example ?~ toJSON (BytesReceived 42)
|> NamedSchema (Just "BytesReceived")
|> pure
5 changes: 3 additions & 2 deletions fission-core/library/Fission/Web/Async.hs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ waitAnySuccessCatchCancel asyncRefs = do

-- | Wait for all cluster peers to complete.
waitAll :: MonadIO m => NonEmpty (Async (Either ClientError a)) -> m (NonEmpty (Either ClientError a))
waitAll asyncRefs = liftIO $ forConcurrently asyncRefs \ref ->
normalizeResult <$> waitCatch ref
waitAll asyncRefs =
liftIO $ forConcurrently asyncRefs \ref ->
normalizeResult <$> waitCatch ref

normalizeResult :: Either SomeException (Either ClientError a) -> Either ClientError a
normalizeResult = \case
Expand Down
9 changes: 7 additions & 2 deletions fission-web-api/library/Fission/Web/API/App/Types.hs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
module Fission.Web.API.App.Types (App) where
module Fission.Web.API.App.Types (App, NonStreaming, Streaming) where

import Fission.Web.API.Prelude

import Fission.Web.API.App.Create.Types
import Fission.Web.API.App.Destroy.Types
import Fission.Web.API.App.Index.Types
import Fission.Web.API.App.Update.Streaming.Types
import Fission.Web.API.App.Update.Types

type App = "app" :> API
type App = NonStreaming :<|> Streaming

type NonStreaming = "app" :> API
type Streaming = "app" :> "streaming" :> StreamingUpdate

type API = Index :<|> Create :<|> Update :<|> Destroy
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
module Fission.Web.API.App.Update.Streaming.Types where

import qualified Network.IPFS.CID.Types as IPFS
import Servant.API

import Fission.BytesReceived.Types
import Fission.URL.Types
import qualified Fission.Web.API.Auth.Types as Auth

type StreamingUpdate
= Summary "Set app content & stream upload progress"
:> Description "Update the content (CID) of an app & stream the progress"
--
:> Capture "App URL" URL
:> Capture "New CID" IPFS.CID
--
:> Auth.HigherOrder
:> Stream 'PATCH 200 NewlineFraming JSON (SourceIO BytesReceived)
1 change: 1 addition & 0 deletions fission-web-api/package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ dependencies:

## Data ##
- swagger2
- streamly
- uuid

## Fission
Expand Down
8 changes: 6 additions & 2 deletions fission-web-client/library/Fission/Web/Client/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ module Fission.Web.Client.App
( index
, create
, update
, streamingUpdate
, destroy
) where

import qualified Servant.Client.Streaming as Streaming

import Fission.Web.API.Prelude

import Fission.Web.API.App.Types
import qualified Fission.Web.API.App.Types as App

index :<|> create :<|> update :<|> destroy = client $ Proxy @App
index :<|> create :<|> update :<|> destroy = client $ Proxy @App.NonStreaming
streamingUpdate = Streaming.client $ Proxy @App.Streaming
4 changes: 4 additions & 0 deletions fission-web-client/library/Fission/Web/Client/Class.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
module Fission.Web.Client.Class (MonadWebClient (..)) where

import Servant.API
import Servant.Client
import qualified Servant.Client.Streaming as Streaming

import Fission.Prelude

class Monad m => MonadWebClient m where
sendRequest :: ClientM a -> m (Either ClientError a)
streamWith :: Streaming.ClientM (SourceIO a) -> (Either ClientError (SourceIO a) -> m b) -> m b
7 changes: 1 addition & 6 deletions fission-web-client/library/Fission/Web/Client/Error.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import Fission.Web.Client
import Network.HTTP.Types.Status
import Servant.Client


retryOnStatus ::
( MonadWebClient m
, MonadLogger m
Expand All @@ -22,11 +21,7 @@ retryOnStatus ::
retryOnStatus retryOn times req =
retryOnErr (checkStatus retryOn) times (sendRequest req)

checkStatus ::
MonadLogger m
=> [Status]
-> Either ClientError a
-> m Bool
checkStatus :: MonadLogger m => [Status] -> Either ClientError a -> m Bool
checkStatus retryOn = \case
Right _ ->
return True
Expand Down
2 changes: 1 addition & 1 deletion fission-web-client/package.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: fission-web-client
version: '1.0.0.0'
version: '1.1.0.0'
category: Client
author:
- Brooklyn Zelenka
Expand Down
Loading