Skip to content

Commit

Permalink
Merge branch 'develop' into klntsky/583-update-odc-2
Browse files Browse the repository at this point in the history
  • Loading branch information
klntsky committed Jun 24, 2022
2 parents 2ed679c + 94b15f7 commit d40d8f7
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 87 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Expand Up @@ -30,12 +30,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)

### Changed

- Updated `ogmios-datum-cache` - bug fixes (#542, #526).
- Updated `ogmios-datum-cache` - bug fixes (#542, #526, #589).
- Improved error response handling for Ogmios (#584)

### Fixed

- Handling of invalid UTF8 byte sequences in the Aeson instance for `TokenName`.
- `Types.ScriptLookups.require` function naming caused problems with WebPack (#593)

## [1.0.1] - 2022-06-17

Expand Down
4 changes: 2 additions & 2 deletions flake.nix
Expand Up @@ -190,8 +190,8 @@
controlApiToken = "";
blockFetcher = {
firstBlock = {
slot = 54066900;
id = "6eb2542a85f375d5fd6cbc1c768707b0e9fe8be85b7b1dd42a85017a70d2623d";
slot = 61625527;
id = "3afd8895c7b270f8250b744ec8d2b3c53ee2859c9d5711d906c47fe51b800988";
};
autoStart = true;
startFromLast = false;
Expand Down
50 changes: 33 additions & 17 deletions src/JsWebSocket.js
Expand Up @@ -19,37 +19,49 @@ class NoPerMessageDeflateWebSocket extends OurWebSocket {

// _mkWebsocket :: (String -> Effect Unit) -> String -> Effect WebSocket
exports._mkWebSocket = logger => url => () => {
logger("Starting websocket attempt")();
var ws;
if (typeof BROWSER_RUNTIME != 'undefined' && BROWSER_RUNTIME) {
ws = new ReconnectingWebSocket.default(url);
} else {
ws = new ReconnectingWebSocket(url, [], {
WebSocket: NoPerMessageDeflateWebSocket
});
}
logger("new websocket")();
return ws;
try {
var ws;
if (typeof BROWSER_RUNTIME != 'undefined' && BROWSER_RUNTIME) {
ws = new ReconnectingWebSocket.default(url);
} else {
ws = new ReconnectingWebSocket(url, [], {
WebSocket: NoPerMessageDeflateWebSocket
});
}
logger("Created a new WebSocket")();
return ws;
} catch (e) {
logger("Failed to create a new WebSocket");
throw e;
};
};

// _onWsConnect :: WebSocket -> (Unit -> Effect Unit) -> Effect Unit
exports._onWsConnect = ws => fn => () => {
exports._onWsConnect = ws => fn => () =>
ws.addEventListener('open', fn);
};

// _onWsError
// :: WebSocket
// -> (String -> Effect Unit) -- logger
// -> (String -> Effect Unit) -- handler
// -> Effect Unit
// -> Effect ListenerRef
exports._onWsError = ws => logger => fn => () => {
ws.addEventListener('error', function func(event) {
const listener = function (event) {
const str = event.toString();
logger(`error: ${str}`)();
logger(`WebSocket error: ${str}`)();
fn(str)();
});
};
ws.addEventListener('error', listener);
return listener;
};

// _removeOnWsError
// :: JsWebSocket
// -> ListenerRef
// -> Effect Unit
exports._removeOnWsError = ws => listener => () =>
ws.removeEventListener('error', listener);

// _onWsMessage
// :: WebSocket
// -> (String -> Effect Unit) -- logger
Expand All @@ -69,6 +81,10 @@ exports._wsSend = ws => logger => str => () => {
ws.send(str);
};

exports._wsReconnect = ws => () => {
ws.reconnect();
};

// _wsClose :: WebSocket -> Effect Unit
exports._wsClose = ws => () => ws.close();

Expand Down
18 changes: 17 additions & 1 deletion src/JsWebSocket.purs
@@ -1,11 +1,14 @@
module JsWebSocket
( JsWebSocket
, ListenerRef
, Url
, _mkWebSocket
, _onWsConnect
, _onWsError
, _removeOnWsError
, _onWsMessage
, _wsSend
, _wsReconnect
, _wsClose
, _wsWatch
) where
Expand All @@ -19,14 +22,18 @@ import Effect (Effect)
--------------------------------------------------------------------------------
foreign import data JsWebSocket :: Type

-- | Opaque listener reference that allows to cancel a listener
foreign import data ListenerRef :: Type

type Url = String

foreign import _mkWebSocket
:: (String -> Effect Unit)
-> Url
-> Effect JsWebSocket

foreign import _onWsConnect :: JsWebSocket -> (Effect Unit) -> Effect Unit
foreign import _onWsConnect
:: JsWebSocket -> (Effect Unit) -> Effect Unit

foreign import _onWsMessage
:: JsWebSocket
Expand All @@ -38,11 +45,20 @@ foreign import _onWsError
:: JsWebSocket
-> (String -> Effect Unit) -- logger
-> (String -> Effect Unit) -- handler
-> Effect ListenerRef

-- | Call `removeEventListener` for a given listener.
foreign import _removeOnWsError
:: JsWebSocket
-> ListenerRef
-> Effect Unit

foreign import _wsSend
:: JsWebSocket -> (String -> Effect Unit) -> String -> Effect Unit

foreign import _wsReconnect
:: JsWebSocket -> Effect Unit

foreign import _wsClose :: JsWebSocket -> Effect Unit

foreign import _wsWatch
Expand Down
160 changes: 108 additions & 52 deletions src/QueryM.purs
Expand Up @@ -52,9 +52,7 @@ import Prelude
import Aeson
( class DecodeAeson
, Aeson
, JsonDecodeError
( TypeMismatch
)
, JsonDecodeError(TypeMismatch)
, caseAesonString
, decodeAeson
, encodeAeson
Expand Down Expand Up @@ -95,7 +93,7 @@ import Data.UInt as UInt
import Deserialization.FromBytes (fromBytes) as Deserialization
import Deserialization.Transaction (convertTransaction) as Deserialization
import Effect (Effect)
import Effect.Aff (Aff, Canceler(Canceler), makeAff)
import Effect.Aff (Aff, Canceler(Canceler), delay, launchAff_, makeAff)
import Effect.Aff.Class (liftAff)
import Effect.Class (liftEffect)
import Effect.Exception (Error, error, throw)
Expand All @@ -110,13 +108,13 @@ import JsWebSocket
, _onWsConnect
, _onWsError
, _onWsMessage
, _removeOnWsError
, _wsClose
, _wsReconnect
, _wsSend
, _wsWatch
)
import QueryM.DatumCacheWsp
( GetDatumByHashR
, GetDatumsByHashesR
)
import QueryM.DatumCacheWsp (GetDatumByHashR, GetDatumsByHashesR)
import QueryM.DatumCacheWsp as DcWsp
import QueryM.JsonWsp (parseJsonWspResponseId)
import QueryM.JsonWsp as JsonWsp
Expand Down Expand Up @@ -165,11 +163,7 @@ import Types.PubKeyHash (PaymentPubKeyHash, PubKeyHash, StakePubKeyHash)
import Types.Scripts (PlutusScript)
import Types.UsedTxOuts (newUsedTxOuts, UsedTxOuts)
import Untagged.Union (asOneOf)
import Wallet
( Wallet(Gero, Nami, KeyWallet)
, Cip30Connection
, Cip30Wallet
)
import Wallet (Wallet(Gero, Nami, KeyWallet), Cip30Connection, Cip30Wallet)

-- This module defines an Aff interface for Ogmios Websocket Queries
-- Since WebSockets do not define a mechanism for linking request/response
Expand Down Expand Up @@ -591,7 +585,7 @@ mkOgmiosWebSocket'
-> ServerConfig
-> (Either Error OgmiosWebSocket -> Effect Unit)
-> Effect Canceler
mkOgmiosWebSocket' lvl serverCfg cb = do
mkOgmiosWebSocket' lvl serverCfg continue = do
utxoDispatchMap <- createMutableDispatch
chainTipDispatchMap <- createMutableDispatch
evaluateTxDispatchMap <- createMutableDispatch
Expand All @@ -609,7 +603,7 @@ mkOgmiosWebSocket' lvl serverCfg cb = do
currentEpochPendingRequests <- createPendingRequests
systemStartPendingRequests <- createPendingRequests
let
md = ogmiosMessageDispatch
messageDispatch = ogmiosMessageDispatch
{ utxoDispatchMap
, chainTipDispatchMap
, evaluateTxDispatchMap
Expand All @@ -622,8 +616,7 @@ mkOgmiosWebSocket' lvl serverCfg cb = do
ws <- _mkWebSocket (logger Debug) $ mkWsUrl serverCfg
let
sendRequest = _wsSend ws (logString lvl Debug)
onError = do
logString lvl Debug "WS error occured, resending requests"
resendPendingRequests = do
Ref.read utxoPendingRequests >>= traverse_ sendRequest
Ref.read chainTipPendingRequests >>= traverse_ sendRequest
Ref.read evaluateTxPendingRequests >>= traverse_ sendRequest
Expand All @@ -632,25 +625,57 @@ mkOgmiosWebSocket' lvl serverCfg cb = do
Ref.read eraSummariesPendingRequests >>= traverse_ sendRequest
Ref.read currentEpochPendingRequests >>= traverse_ sendRequest
Ref.read systemStartPendingRequests >>= traverse_ sendRequest
_onWsConnect ws do
_wsWatch ws (logger Debug) onError
_onWsMessage ws (logger Debug) $ defaultMessageListener lvl md
_onWsError ws (logger Error) $ const onError
cb $ Right $ WebSocket ws
{ utxo: mkListenerSet utxoDispatchMap utxoPendingRequests
, chainTip: mkListenerSet chainTipDispatchMap chainTipPendingRequests
, evaluate: mkListenerSet evaluateTxDispatchMap evaluateTxPendingRequests
, getProtocolParameters: mkListenerSet getProtocolParametersDispatchMap
getProtocolParametersPendingRequests
, submit: mkListenerSet submitDispatchMap submitPendingRequests
, eraSummaries:
mkListenerSet eraSummariesDispatchMap eraSummariesPendingRequests
, currentEpoch:
mkListenerSet currentEpochDispatchMap currentEpochPendingRequests
, systemStart:
mkListenerSet systemStartDispatchMap systemStartPendingRequests
}
pure $ Canceler $ \err -> liftEffect $ cb $ Left $ err
logString lvl Debug "Resent all pending requests"
-- We want to fail if the first connection attempt is not successful.
-- Otherwise, we start reconnecting indefinitely.
onFirstConnectionError errMessage = do
_wsClose ws
logger Error $
"First connection to Ogmios WebSocket failed. Terminating. Error: " <>
errMessage
_wsClose ws
continue $ Left $ error errMessage
firstConnectionErrorRef <- _onWsError ws (logger Error) onFirstConnectionError
hasConnectedOnceRef <- Ref.new false
_onWsConnect ws $ Ref.read hasConnectedOnceRef >>= case _ of
true -> do
logger Debug
"Ogmios WS connection re-established, resending pending requests..."
resendPendingRequests
false -> do
logger Debug "Ogmios Connection established"
Ref.write true hasConnectedOnceRef
_removeOnWsError ws firstConnectionErrorRef
_wsWatch ws (logger Debug) do
logger Debug "Ogmios WebSocket terminated by timeout. Reconnecting..."
_wsReconnect ws
_onWsMessage ws (logger Debug) $ defaultMessageListener lvl
messageDispatch
void $ _onWsError ws (logger Error) $ \err -> do
logString lvl Debug $
"Ogmios WebSocket error (" <> err <> "). Reconnecting..."
launchAff_ do
delay (wrap 500.0)
liftEffect $ _wsReconnect ws
continue $ Right $ WebSocket ws
{ utxo: mkListenerSet utxoDispatchMap utxoPendingRequests
, chainTip: mkListenerSet chainTipDispatchMap chainTipPendingRequests
, evaluate: mkListenerSet evaluateTxDispatchMap
evaluateTxPendingRequests
, getProtocolParameters: mkListenerSet
getProtocolParametersDispatchMap
getProtocolParametersPendingRequests
, submit: mkListenerSet submitDispatchMap submitPendingRequests
, eraSummaries:
mkListenerSet eraSummariesDispatchMap eraSummariesPendingRequests
, currentEpoch:
mkListenerSet currentEpochDispatchMap currentEpochPendingRequests
, systemStart:
mkListenerSet systemStartDispatchMap systemStartPendingRequests
}
pure $ Canceler $ \err -> liftEffect do
_wsClose ws
continue $ Left $ err
where
logger :: LogLevel -> String -> Effect Unit
logger = logString lvl
Expand All @@ -660,34 +685,65 @@ mkDatumCacheWebSocket'
-> ServerConfig
-> (Either Error DatumCacheWebSocket -> Effect Unit)
-> Effect Canceler
mkDatumCacheWebSocket' lvl serverCfg cb = do
mkDatumCacheWebSocket' lvl serverCfg continue = do
getDatumByHashDispatchMap <- createMutableDispatch
getDatumsByHashesDispatchMap <- createMutableDispatch
getDatumByHashPendingRequests <- createPendingRequests
getDatumsByHashesPendingRequests <- createPendingRequests
let
md = datumCacheMessageDispatch
messageDispatch = datumCacheMessageDispatch
{ getDatumByHashDispatchMap
, getDatumsByHashesDispatchMap
}
ws <- _mkWebSocket (logger Debug) $ mkOgmiosDatumCacheWsUrl serverCfg
let
sendRequest = _wsSend ws (logString lvl Debug)
onError = do
logString lvl Debug "Datum Cache: WS error occured, resending requests"
sendRequest = _wsSend ws (logger Debug)
resendPendingRequests = do
Ref.read getDatumByHashPendingRequests >>= traverse_ sendRequest
Ref.read getDatumsByHashesPendingRequests >>= traverse_ sendRequest
_onWsConnect ws $ do
_wsWatch ws (logger Debug) onError
_onWsMessage ws (logger Debug) $ defaultMessageListener lvl md
_onWsError ws (logger Error) $ const onError
cb $ Right $ WebSocket ws
{ getDatumByHash: mkListenerSet getDatumByHashDispatchMap
getDatumByHashPendingRequests
, getDatumsByHashes: mkListenerSet getDatumsByHashesDispatchMap
getDatumsByHashesPendingRequests
}
pure $ Canceler $ \err -> liftEffect $ cb $ Left $ err
-- We want to fail if the first connection attempt is not successful.
-- Otherwise, we start reconnecting indefinitely.
onFirstConnectionError errMessage = do
_wsClose ws
logger Error $
"First connection to Ogmios Datum Cache WebSocket failed. "
<> "Terminating. Error: "
<> errMessage
continue $ Left $ error errMessage
firstConnectionErrorRef <- _onWsError ws (logger Error) onFirstConnectionError
hasConnectedOnceRef <- Ref.new false
_onWsConnect ws $ Ref.read hasConnectedOnceRef >>= case _ of
true -> do
logger Debug $
"Ogmios Datum Cache WS connection re-established, resending " <>
"pending requests..."
resendPendingRequests
false -> do
logger Debug "Ogmios Datum Cache Connection established"
Ref.write true hasConnectedOnceRef
_removeOnWsError ws firstConnectionErrorRef
_wsWatch ws (logger Debug) do
logger Debug $ "Ogmios Datum Cache WebSocket terminated by " <>
"timeout. Reconnecting..."
_wsReconnect ws
_onWsMessage ws (logger Debug) $ defaultMessageListener lvl
messageDispatch
void $ _onWsError ws (logger Error) $ \err -> do
logger Debug $
"Ogmios Datum Cache WebSocket error (" <> err <>
"). Reconnecting..."
launchAff_ do
delay (wrap 500.0)
liftEffect $ _wsReconnect ws
continue $ Right $ WebSocket ws
{ getDatumByHash: mkListenerSet getDatumByHashDispatchMap
getDatumByHashPendingRequests
, getDatumsByHashes: mkListenerSet getDatumsByHashesDispatchMap
getDatumsByHashesPendingRequests
}
pure $ Canceler $ \err -> liftEffect do
_wsClose ws
continue $ Left $ err
where
logger :: LogLevel -> String -> Effect Unit
logger = logString lvl
Expand Down

0 comments on commit d40d8f7

Please sign in to comment.