diff --git a/cabal.project b/cabal.project index 3f58908ef2e..6838e11549e 100644 --- a/cabal.project +++ b/cabal.project @@ -14,6 +14,7 @@ packages: bench/locli bench/tx-generator plutus-example/plutus-example + datapoint-forward trace-dispatcher trace-forward trace-resources @@ -77,6 +78,9 @@ package cardano-testnet package cardano-tracer tests: True +package datapoint-forward + tests: True + package trace-dispatcher tests: True diff --git a/datapoint-forward/CHANGELOG.md b/datapoint-forward/CHANGELOG.md new file mode 100644 index 00000000000..27316fb6d97 --- /dev/null +++ b/datapoint-forward/CHANGELOG.md @@ -0,0 +1,3 @@ +# ChangeLog + +# 0.1.0 diff --git a/datapoint-forward/CODEOWNERS b/datapoint-forward/CODEOWNERS new file mode 100644 index 00000000000..6e6b1a89e87 --- /dev/null +++ b/datapoint-forward/CODEOWNERS @@ -0,0 +1,3 @@ +# General reviewers per PR +# Denis Serge Jürgen +* @denisshevchenko @deepfire @jutaro diff --git a/datapoint-forward/LICENSE b/datapoint-forward/LICENSE new file mode 100644 index 00000000000..f471221ad3a --- /dev/null +++ b/datapoint-forward/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2021 Input Output (Hong Kong) Ltd. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/datapoint-forward/NOTICE b/datapoint-forward/NOTICE new file mode 100644 index 00000000000..fb77bb84e9d --- /dev/null +++ b/datapoint-forward/NOTICE @@ -0,0 +1,14 @@ +Copyright 2021 Input Output (Hong Kong) Ltd. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/datapoint-forward/README.md b/datapoint-forward/README.md new file mode 100644 index 00000000000..435f147f835 --- /dev/null +++ b/datapoint-forward/README.md @@ -0,0 +1,9 @@ +# trace-forward + +`trace-forward` is a library allowing to forward tracing items from one process to another one. It is built upon [`typed-protocols`](https://github.com/input-output-hk/ouroboros-network/tree/master/typed-protocols). + +The `trace-dispatcher` is using `trace-forward` to forward `TraceObject`s from the node to exernal acceptors (for example, `cardano-tracer`). + +## Developers + +Benchmarking team is responsible for this library. The primary developer is [@denisshevchenko](https://github.com/denisshevchenko). diff --git a/datapoint-forward/datapoint-forward.cabal b/datapoint-forward/datapoint-forward.cabal new file mode 100644 index 00000000000..b7f7d426f55 --- /dev/null +++ b/datapoint-forward/datapoint-forward.cabal @@ -0,0 +1,90 @@ +cabal-version: 2.4 +name: datapoint-forward +version: 0.1.0 +synopsis: See README for more info +description: See README for more info +license: Apache-2.0 +license-file: LICENSE +copyright: 2021 Input Output (Hong Kong) Ltd. +author: IOHK +maintainer: operations@iohk.io +build-type: Simple +extra-doc-files: README.md + CHANGELOG.md + +common base { build-depends: base >= 4.14 && < 4.15 } + +common project-config + default-language: Haskell2010 + + ghc-options: -Wall + -Wcompat + -Wincomplete-record-updates + -Wincomplete-uni-patterns + -Wno-unticked-promoted-constructors + -Wno-orphans + -Wpartial-fields + -Wredundant-constraints + -Wunused-packages + +library + import: base, project-config + hs-source-dirs: src + + exposed-modules: DataPoint.Forward.Acceptor + DataPoint.Forward.Configuration + DataPoint.Forward.Forwarder + DataPoint.Forward.Queue + DataPoint.Forward.Utils + + DataPoint.Forward.Network.Acceptor + DataPoint.Forward.Network.Forwarder + + DataPoint.Forward.Protocol.Acceptor + DataPoint.Forward.Protocol.Codec + DataPoint.Forward.Protocol.Forwarder + DataPoint.Forward.Protocol.Type + + build-depends: async + , bytestring + , cborg + , contra-tracer + , extra + , io-classes + , ouroboros-network-framework + , serialise + , stm + , text + , time + , typed-protocols + +test-suite test + import: base, project-config + type: exitcode-stdio-1.0 + main-is: Main.hs + hs-source-dirs: test + + other-modules: Test.DataPoint.Forward.Protocol.Codec + Test.DataPoint.Forward.Protocol.Tests + Test.DataPoint.Forward.Protocol.TraceItem + Test.DataPoint.Forward.Demo.Configs + Test.DataPoint.Forward.Demo.Tests + + build-depends: async + , contra-tracer + , directory + , extra + , filepath + , ouroboros-network-framework + , datapoint-forward + , QuickCheck + , serialise + , tasty + , tasty-quickcheck + , typed-protocols + , typed-protocols-examples + , text + , time + + ghc-options: -rtsopts + -threaded diff --git a/datapoint-forward/src/DataPoint/Forward/Acceptor.hs b/datapoint-forward/src/DataPoint/Forward/Acceptor.hs new file mode 100644 index 00000000000..1c7bd7f1c0d --- /dev/null +++ b/datapoint-forward/src/DataPoint/Forward/Acceptor.hs @@ -0,0 +1,28 @@ +{-# LANGUAGE NamedFieldPuns #-} + +-- | This top-level module will be used by the acceptor application. +-- Acceptor application asks 'TraceObject's from the forwarder application. +module DataPoint.Forward.Acceptor + ( runTraceAcceptor + ) where + +import qualified Codec.Serialise as CBOR +import Data.Typeable (Typeable) + +import Ouroboros.Network.IOManager (IOManager) +import Ouroboros.Network.Util.ShowProxy (ShowProxy(..)) + +import DataPoint.Forward.Network.Acceptor (listenToForwarder) +import DataPoint.Forward.Configuration (AcceptorConfiguration (..)) +import DataPoint.Forward.Utils (runActionInLoop) + +runTraceAcceptor + :: (CBOR.Serialise lo, + ShowProxy lo, + Typeable lo) + => IOManager -- ^ 'IOManager' from the external application. + -> AcceptorConfiguration lo -- ^ Acceptor configuration. + -> ([lo] -> IO ()) -- ^ The handler for 'TraceObject's received from the node. + -> IO () +runTraceAcceptor iomgr config@AcceptorConfiguration{forwarderEndpoint} loHandler = + runActionInLoop (listenToForwarder iomgr config loHandler) forwarderEndpoint 1 diff --git a/datapoint-forward/src/DataPoint/Forward/Configuration.hs b/datapoint-forward/src/DataPoint/Forward/Configuration.hs new file mode 100644 index 00000000000..5f5a92ff807 --- /dev/null +++ b/datapoint-forward/src/DataPoint/Forward/Configuration.hs @@ -0,0 +1,48 @@ +module DataPoint.Forward.Configuration + ( AcceptorConfiguration (..) + , ForwarderConfiguration (..) + , HowToConnect (..) + ) where + +import Control.Tracer (Tracer) +import GHC.Conc (TVar) +import Ouroboros.Network.Driver (TraceSendRecv) + +import DataPoint.Forward.Protocol.Type + +-- | Specifies how to connect to the peer. +-- Currently, only local socket/pipe is used. +newtype HowToConnect = LocalPipe FilePath + deriving Show + +-- | Acceptor configuration, parameterized by trace item's type. +data AcceptorConfiguration lo = AcceptorConfiguration + { -- | The tracer that will be used by the acceptor in its network layer. + acceptorTracer :: !(Tracer IO (TraceSendRecv (TraceForward lo))) + -- | The endpoint that will be used to listen to the forwarder. + , forwarderEndpoint :: !HowToConnect + -- | The request specifies how many 'TraceObject's will be requested. + , whatToRequest :: !NumberOfTraceObjects + -- | 'TVar' that can be used as a brake: if an external thread sets + -- it to 'True', the acceptor will send 'MsgDone' message to the + -- forwarder and their session will be closed. + , shouldWeStop :: !(TVar Bool) + } + +-- | Forwarder configuration, parameterized by trace item's type. +data ForwarderConfiguration lo = ForwarderConfiguration + { -- | The tracer that will be used by the forwarder in its network layer. + forwarderTracer :: !(Tracer IO (TraceSendRecv (TraceForward lo))) + -- | The endpoint that will be used to connect to the acceptor. + , acceptorEndpoint :: !HowToConnect + -- | The big size of internal queue for tracing items. We use it in + -- the beginning of the session, to avoid queue overflow, because + -- initially there is no connection with acceptor yet, and the + -- number of tracing items after node's start may be very big. + , disconnectedQueueSize :: !Word + -- | The small size of internal queue for tracing items. We use it + -- after the big queue is empty, which means that acceptor is connected + -- and tracing items are already forwarded to it. We switch to small + -- queue to reduce memory usage in the node. + , connectedQueueSize :: !Word + } diff --git a/datapoint-forward/src/DataPoint/Forward/Forwarder.hs b/datapoint-forward/src/DataPoint/Forward/Forwarder.hs new file mode 100644 index 00000000000..93c70a1ee02 --- /dev/null +++ b/datapoint-forward/src/DataPoint/Forward/Forwarder.hs @@ -0,0 +1,27 @@ +{-# LANGUAGE NamedFieldPuns #-} + +-- This top-level module will be used by the forwarder application. +-- Forwarder application collects 'TraceObject's and sends them to +-- the acceptor application. +module DataPoint.Forward.Forwarder + ( runTraceForwarder + ) where + +import qualified Codec.Serialise as CBOR + +import Ouroboros.Network.IOManager (IOManager) +import Ouroboros.Network.Util.ShowProxy (ShowProxy(..)) + +import DataPoint.Forward.Configuration (ForwarderConfiguration (..)) +import DataPoint.Forward.Network.Forwarder (connectToAcceptor) +import DataPoint.Forward.Utils + +runTraceForwarder + :: (CBOR.Serialise lo, + ShowProxy lo) + => IOManager -- ^ 'IOManager' from the external application. + -> ForwarderConfiguration lo -- ^ Forwarder configuration. + -> ForwardSink lo -- ^ Forward "sink" that will be used to write tracing items. + -> IO () +runTraceForwarder iomgr config@ForwarderConfiguration{acceptorEndpoint} forwardSink = + runActionInLoop (connectToAcceptor iomgr config forwardSink) acceptorEndpoint 1 diff --git a/datapoint-forward/src/DataPoint/Forward/Network/Acceptor.hs b/datapoint-forward/src/DataPoint/Forward/Network/Acceptor.hs new file mode 100644 index 00000000000..3c8b455f9e2 --- /dev/null +++ b/datapoint-forward/src/DataPoint/Forward/Network/Acceptor.hs @@ -0,0 +1,182 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE NumericUnderscores #-} + +module DataPoint.Forward.Network.Acceptor + ( listenToForwarder + -- | Export this function for Mux purpose. + , acceptTraceObjects + , acceptTraceObjectsInit + , Timeout (..) + ) where + +import Codec.CBOR.Term (Term) +import qualified Codec.Serialise as CBOR +import Control.Concurrent.Async (race, race_, wait) +import Control.Monad.Extra (ifM) +import Control.Monad.STM (atomically, check) +import Control.Concurrent.STM.TVar (TVar, readTVar, readTVarIO, registerDelay) +import Control.Exception (Exception, throwIO) +import qualified Data.ByteString.Lazy as LBS +import Data.Typeable (Typeable) +import Data.Void (Void) +import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolLimits (..), + MiniProtocolNum (..), MuxMode (..), + OuroborosApplication (..), MuxPeer (..), + RunMiniProtocol (..), + miniProtocolLimits, miniProtocolNum, miniProtocolRun) +import Ouroboros.Network.Driver.Limits (ProtocolTimeLimits) +import Ouroboros.Network.Driver.Simple (runPeer) +import Ouroboros.Network.ErrorPolicy (nullErrorPolicies) +import Ouroboros.Network.IOManager (IOManager) +import Ouroboros.Network.Snocket (Snocket, localAddressFromPath, localSnocket) +import Ouroboros.Network.Socket (AcceptedConnectionsLimit (..), + SomeResponderApplication (..), + cleanNetworkMutableState, newNetworkMutableState, + nullNetworkServerTracers, withServerNode) +import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec, + noTimeLimitsHandshake) +import Ouroboros.Network.Protocol.Handshake.Unversioned (UnversionedProtocol (..), + UnversionedProtocolData (..), + unversionedHandshakeCodec, + unversionedProtocolDataCodec) +import Ouroboros.Network.Protocol.Handshake.Type (Handshake) +import Ouroboros.Network.Protocol.Handshake.Version (acceptableVersion, + simpleSingletonVersions) +import Ouroboros.Network.Util.ShowProxy (ShowProxy(..)) + +import qualified DataPoint.Forward.Protocol.Acceptor as Acceptor +import qualified DataPoint.Forward.Protocol.Codec as Acceptor +import DataPoint.Forward.Protocol.Type +import DataPoint.Forward.Queue (getTraceObjects) +import DataPoint.Forward.Configuration (AcceptorConfiguration (..), HowToConnect (..)) + +listenToForwarder + :: (CBOR.Serialise lo, + ShowProxy lo, + Typeable lo) + => IOManager + -> AcceptorConfiguration lo + -> ([lo] -> IO ()) + -> IO () +listenToForwarder iomgr config@AcceptorConfiguration{forwarderEndpoint} loHandler = do + let (LocalPipe localPipe) = forwarderEndpoint + snocket = localSnocket iomgr + address = localAddressFromPath localPipe + doListenToForwarder snocket address noTimeLimitsHandshake app + where + app = + -- TODO: There's _shouldStopSTM and 'shouldWeStop' in + -- 'AcceptorConfiguration'. Currently 'ouroboros-network' does not exposes + -- the write end of `_shouldStopSTM`, if it did we could use it instead of + -- 'shouldWeStop'. + OuroborosApplication $ \_connectionId _shouldStopSTM -> + [ MiniProtocol + { miniProtocolNum = MiniProtocolNum 1 + , miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = maxBound } + , miniProtocolRun = acceptTraceObjects config loHandler + } + ] + +doListenToForwarder + :: Ord addr + => Snocket IO fd addr + -> addr + -> ProtocolTimeLimits (Handshake UnversionedProtocol Term) + -> OuroborosApplication 'ResponderMode addr LBS.ByteString IO Void () + -> IO () +doListenToForwarder snocket address timeLimits app = do + networkState <- newNetworkMutableState + race_ (cleanNetworkMutableState networkState) + $ withServerNode + snocket + nullNetworkServerTracers + networkState + (AcceptedConnectionsLimit maxBound maxBound 0) + address + unversionedHandshakeCodec + timeLimits + (cborTermVersionDataCodec unversionedProtocolDataCodec) + acceptableVersion + (simpleSingletonVersions + UnversionedProtocol + UnversionedProtocolData + (SomeResponderApplication app)) + nullErrorPolicies + $ \_ serverAsync -> wait serverAsync -- Block until async exception. + +acceptTraceObjects + :: (CBOR.Serialise lo, + ShowProxy lo, + Typeable lo) + => AcceptorConfiguration lo + -> ([lo] -> IO ()) + -> RunMiniProtocol 'ResponderMode LBS.ByteString IO Void () +acceptTraceObjects config loHandler = + ResponderProtocolOnly $ + MuxPeerRaw $ \channel -> + timeoutWhenStopped + (shouldWeStop config) + 15_000 -- 15sec + $ runPeer + (acceptorTracer config) + (Acceptor.codecTraceForward CBOR.encode CBOR.decode + CBOR.encode CBOR.decode) + channel + (Acceptor.traceAcceptorPeer $ acceptorActions config loHandler) + +acceptTraceObjectsInit + :: (CBOR.Serialise lo, + ShowProxy lo, + Typeable lo) + => AcceptorConfiguration lo + -> ([lo] -> IO ()) + -> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void +acceptTraceObjectsInit config loHandler = + InitiatorProtocolOnly $ + MuxPeerRaw $ \channel -> + runPeer + (acceptorTracer config) + (Acceptor.codecTraceForward CBOR.encode CBOR.decode + CBOR.encode CBOR.decode) + channel + (Acceptor.traceAcceptorPeer $ acceptorActions config loHandler) + +acceptorActions + :: (CBOR.Serialise lo, + ShowProxy lo, + Typeable lo) + => AcceptorConfiguration lo -- ^ Acceptor's configuration. + -> ([lo] -> IO ()) -- ^ The handler for accepted 'TraceObject's. + -> Acceptor.TraceAcceptor lo IO () +acceptorActions config@AcceptorConfiguration{whatToRequest, shouldWeStop} loHandler = + Acceptor.SendMsgTraceObjectsRequest TokBlocking whatToRequest $ \replyWithTraceObjects -> do + loHandler $ getTraceObjects replyWithTraceObjects + checkIfWeShouldStop + where + checkIfWeShouldStop = + ifM (readTVarIO shouldWeStop) + (return $ Acceptor.SendMsgDone $ return ()) + (return $ acceptorActions config loHandler) + +data Timeout = Timeout + deriving (Typeable, Show) + +instance Exception Timeout where + +-- | Timeout shutdown of an action. It can run only for specified miliseconds +-- once the 'TVar' is set to 'True'. +-- +timeoutWhenStopped :: TVar Bool + -> Int -- timeout in miliseconds + -> IO a + -> IO a +timeoutWhenStopped stopVar delay io = + either id id <$> + race io + ( do atomically (readTVar stopVar >>= check) + v <- registerDelay delay + atomically (readTVar v >>= check) + throwIO Timeout + ) diff --git a/datapoint-forward/src/DataPoint/Forward/Network/Forwarder.hs b/datapoint-forward/src/DataPoint/Forward/Network/Forwarder.hs new file mode 100644 index 00000000000..5dd2bd86786 --- /dev/null +++ b/datapoint-forward/src/DataPoint/Forward/Network/Forwarder.hs @@ -0,0 +1,114 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE NamedFieldPuns #-} + +module DataPoint.Forward.Network.Forwarder + ( connectToAcceptor + -- | Export this function for Mux purpose. + , forwardTraceObjects + , forwardTraceObjectsResp + ) where + +import Codec.CBOR.Term (Term) +import qualified Codec.Serialise as CBOR +import qualified Data.ByteString.Lazy as LBS +import Data.Void (Void) +import Ouroboros.Network.Driver.Limits (ProtocolTimeLimits) +import Ouroboros.Network.Driver.Simple (runPeer) +import Ouroboros.Network.IOManager (IOManager) +import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolLimits (..), + MiniProtocolNum (..), MuxMode (..), + OuroborosApplication (..), MuxPeer (..), + RunMiniProtocol (..), + miniProtocolLimits, miniProtocolNum, miniProtocolRun) +import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec, + noTimeLimitsHandshake) +import Ouroboros.Network.Protocol.Handshake.Unversioned (UnversionedProtocol (..), + UnversionedProtocolData (..), + unversionedHandshakeCodec, + unversionedProtocolDataCodec) +import Ouroboros.Network.Protocol.Handshake.Type (Handshake) +import Ouroboros.Network.Protocol.Handshake.Version (acceptableVersion, simpleSingletonVersions) +import Ouroboros.Network.Snocket (Snocket, localAddressFromPath, localSnocket) +import Ouroboros.Network.Socket (connectToNode, nullNetworkConnectTracers) +import Ouroboros.Network.Util.ShowProxy (ShowProxy(..)) + +import DataPoint.Forward.Configuration (ForwarderConfiguration (..), HowToConnect (..)) +import DataPoint.Forward.Queue (readItems) +import DataPoint.Forward.Utils +import qualified DataPoint.Forward.Protocol.Forwarder as Forwarder +import qualified DataPoint.Forward.Protocol.Codec as Forwarder + +connectToAcceptor + :: (CBOR.Serialise lo, + ShowProxy lo) + => IOManager + -> ForwarderConfiguration lo + -> ForwardSink lo + -> IO () +connectToAcceptor iomgr config@ForwarderConfiguration{acceptorEndpoint} sink = do + let (LocalPipe localPipe) = acceptorEndpoint + snocket = localSnocket iomgr + address = localAddressFromPath localPipe + doConnectToAcceptor snocket address noTimeLimitsHandshake app + where + app = + OuroborosApplication $ \_connectionId _shouldStopSTM -> + [ MiniProtocol + { miniProtocolNum = MiniProtocolNum 1 + , miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = maxBound } + , miniProtocolRun = forwardTraceObjects config sink + } + ] + +doConnectToAcceptor + :: Snocket IO fd addr + -> addr + -> ProtocolTimeLimits (Handshake UnversionedProtocol Term) + -> OuroborosApplication 'InitiatorMode addr LBS.ByteString IO () Void + -> IO () +doConnectToAcceptor snocket address timeLimits app = + connectToNode + snocket + unversionedHandshakeCodec + timeLimits + (cborTermVersionDataCodec unversionedProtocolDataCodec) + nullNetworkConnectTracers + acceptableVersion + (simpleSingletonVersions + UnversionedProtocol + UnversionedProtocolData + app) + Nothing + address + +forwardTraceObjects + :: (CBOR.Serialise lo, + ShowProxy lo) + => ForwarderConfiguration lo + -> ForwardSink lo + -> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void +forwardTraceObjects config sink = + InitiatorProtocolOnly $ + MuxPeerRaw $ \channel -> + runPeer + (forwarderTracer config) + (Forwarder.codecTraceForward CBOR.encode CBOR.decode + CBOR.encode CBOR.decode) + channel + (Forwarder.traceForwarderPeer $ readItems config sink) + +forwardTraceObjectsResp + :: (CBOR.Serialise lo, + ShowProxy lo) + => ForwarderConfiguration lo + -> ForwardSink lo + -> RunMiniProtocol 'ResponderMode LBS.ByteString IO Void () +forwardTraceObjectsResp config sink = + ResponderProtocolOnly $ + MuxPeerRaw $ \channel -> + runPeer + (forwarderTracer config) + (Forwarder.codecTraceForward CBOR.encode CBOR.decode + CBOR.encode CBOR.decode) + channel + (Forwarder.traceForwarderPeer $ readItems config sink) diff --git a/datapoint-forward/src/DataPoint/Forward/Protocol/Acceptor.hs b/datapoint-forward/src/DataPoint/Forward/Protocol/Acceptor.hs new file mode 100644 index 00000000000..6482bae4ea2 --- /dev/null +++ b/datapoint-forward/src/DataPoint/Forward/Protocol/Acceptor.hs @@ -0,0 +1,65 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE PolyKinds #-} +{-# LANGUAGE RankNTypes #-} + +-- | A view of the trace forwarding/accepting protocol +-- from the point of view of the client. +-- +-- For execution, a conversion into the typed protocol is provided. +-- +module DataPoint.Forward.Protocol.Acceptor + ( TraceAcceptor(..) + , traceAcceptorPeer + ) where + +import Network.TypedProtocol.Core (Peer (..), PeerHasAgency (..), + PeerRole (..)) + +import DataPoint.Forward.Protocol.Type + +data TraceAcceptor lo m a where + SendMsgTraceObjectsRequest + :: TokBlockingStyle blocking + -> NumberOfTraceObjects + -> (BlockingReplyList blocking lo -> m (TraceAcceptor lo m a)) + -> TraceAcceptor lo m a + + SendMsgDone + :: m a + -> TraceAcceptor lo m a + +-- | Interpret a particular action sequence into the client side of the protocol. +-- +traceAcceptorPeer + :: Monad m + => TraceAcceptor lo m a + -> Peer (TraceForward lo) 'AsClient 'StIdle m a +traceAcceptorPeer = \case + SendMsgTraceObjectsRequest TokBlocking request next -> + -- Send our message (request for new 'TraceObject's from the forwarder). + Yield (ClientAgency TokIdle) (MsgTraceObjectsRequest TokBlocking request) $ + -- We're now into the 'StBusy' state, and now we'll wait for a reply + -- from the forwarder. + Await (ServerAgency (TokBusy TokBlocking)) $ \(MsgTraceObjectsReply reply) -> + Effect $ + traceAcceptorPeer <$> next reply + + SendMsgTraceObjectsRequest TokNonBlocking request next -> + -- Send our message (request for new 'TraceObject's from the forwarder). + Yield (ClientAgency TokIdle) (MsgTraceObjectsRequest TokNonBlocking request) $ + -- We're now into the 'StBusy' state, and now we'll wait for a reply + -- from the forwarder. It is assuming that the forwarder will reply + -- immediately (even there are no 'TraceObject's). + Await (ServerAgency (TokBusy TokNonBlocking)) $ \(MsgTraceObjectsReply reply) -> + Effect $ + traceAcceptorPeer <$> next reply + + SendMsgDone getResult -> + -- We do an actual transition using 'yield', to go from the 'StIdle' to + -- 'StDone' state. Once in the 'StDone' state we can actually stop using + -- 'done', with a return value. + Effect $ + Yield (ClientAgency TokIdle) MsgDone . Done TokDone + <$> getResult diff --git a/datapoint-forward/src/DataPoint/Forward/Protocol/Codec.hs b/datapoint-forward/src/DataPoint/Forward/Protocol/Codec.hs new file mode 100644 index 00000000000..a03b2442687 --- /dev/null +++ b/datapoint-forward/src/DataPoint/Forward/Protocol/Codec.hs @@ -0,0 +1,108 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE PolyKinds #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module DataPoint.Forward.Protocol.Codec ( + codecTraceForward + ) where + +import qualified Codec.CBOR.Decoding as CBOR +import qualified Codec.CBOR.Encoding as CBOR +import Codec.CBOR.Read (DeserialiseFailure) +import Control.Monad.Class.MonadST (MonadST) +import qualified Data.ByteString.Lazy as LBS +import qualified Data.List.NonEmpty as NE +import Text.Printf (printf) +import Ouroboros.Network.Codec (Codec, PeerHasAgency (..), + PeerRole (..), SomeMessage (..), + mkCodecCborLazyBS) + +import DataPoint.Forward.Protocol.Type + +codecTraceForward + :: forall lo m. + MonadST m + => (NumberOfTraceObjects -> CBOR.Encoding) -- ^ Encoder for 'Request'. + -> (forall s . CBOR.Decoder s NumberOfTraceObjects) -- ^ Decoder for 'Request'. + -> ([lo] -> CBOR.Encoding) -- ^ Encoder for reply with list of 'TraceObject's. + -> (forall s . CBOR.Decoder s [lo]) -- ^ Decoder for reply with list of 'TraceObject's. + -> Codec (TraceForward lo) + DeserialiseFailure m LBS.ByteString +codecTraceForward encodeRequest decodeRequest + encodeReplyList decodeReplyList = + mkCodecCborLazyBS encode decode + where + -- Encode messages. + encode + :: forall (pr :: PeerRole) + (st :: TraceForward lo) + (st' :: TraceForward lo). + PeerHasAgency pr st + -> Message (TraceForward lo) st st' + -> CBOR.Encoding + + encode (ClientAgency TokIdle) (MsgTraceObjectsRequest blocking request) = + CBOR.encodeListLen 3 + <> CBOR.encodeWord 1 + <> CBOR.encodeBool (case blocking of + TokBlocking -> True + TokNonBlocking -> False) + <> encodeRequest request + + encode (ClientAgency TokIdle) MsgDone = + CBOR.encodeListLen 1 + <> CBOR.encodeWord 2 + + encode (ServerAgency (TokBusy _)) (MsgTraceObjectsReply reply) = + CBOR.encodeListLen 2 + <> CBOR.encodeWord 4 + <> encodeReplyList replyList + where + replyList = + case reply of + BlockingReply los -> NE.toList los + NonBlockingReply los -> los + + -- Decode messages + decode + :: forall (pr :: PeerRole) + (st :: TraceForward lo) s. + PeerHasAgency pr st + -> CBOR.Decoder s (SomeMessage st) + decode stok = do + len <- CBOR.decodeListLen + key <- CBOR.decodeWord + case (key, len, stok) of + (1, 3, ClientAgency TokIdle) -> do + blocking <- CBOR.decodeBool + request <- decodeRequest + return $! + if blocking then + SomeMessage $ MsgTraceObjectsRequest TokBlocking request + else + SomeMessage $ MsgTraceObjectsRequest TokNonBlocking request + + (2, 1, ClientAgency TokIdle) -> + return $ SomeMessage MsgDone + + (4, 2, ServerAgency (TokBusy blocking)) -> do + replyList <- decodeReplyList + case (blocking, replyList) of + (TokBlocking, x:xs) -> + return $ SomeMessage (MsgTraceObjectsReply (BlockingReply (x NE.:| xs))) + + (TokNonBlocking, los) -> + return $ SomeMessage (MsgTraceObjectsReply (NonBlockingReply los)) + + (TokBlocking, []) -> + fail "codecTraceForward: MsgTraceObjectsReply: empty list not permitted" + + -- Failures per protocol state + (_, _, ClientAgency TokIdle) -> + fail (printf "codecTraceForward (%s) unexpected key (%d, %d)" (show stok) key len) + (_, _, ServerAgency (TokBusy TokBlocking)) -> + fail (printf "codecTraceForward (%s) unexpected key (%d, %d)" (show stok) key len) + (_, _, ServerAgency (TokBusy TokNonBlocking)) -> + fail (printf "codecTraceForward (%s) unexpected key (%d, %d)" (show stok) key len) diff --git a/datapoint-forward/src/DataPoint/Forward/Protocol/Forwarder.hs b/datapoint-forward/src/DataPoint/Forward/Protocol/Forwarder.hs new file mode 100644 index 00000000000..0c8df2a8f71 --- /dev/null +++ b/datapoint-forward/src/DataPoint/Forward/Protocol/Forwarder.hs @@ -0,0 +1,51 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE NamedFieldPuns #-} + +module DataPoint.Forward.Protocol.Forwarder + ( TraceForwarder (..) + , traceForwarderPeer + ) where + +import Network.TypedProtocol.Core (Peer (..), PeerHasAgency (..), + PeerRole (..)) + +import DataPoint.Forward.Protocol.Type + +data TraceForwarder lo m a = TraceForwarder + { -- | The acceptor sent us a request for new 'TraceObject's. + recvMsgTraceObjectsRequest + :: forall blocking. + TokBlockingStyle blocking + -> NumberOfTraceObjects + -> m (BlockingReplyList blocking lo, TraceForwarder lo m a) + + -- | The acceptor terminated. Here we have a pure return value, but we + -- could have done another action in 'm' if we wanted to. + , recvMsgDone :: m a + } + +-- | Interpret a particular action sequence into the server side of the protocol. +-- +traceForwarderPeer + :: Monad m + => TraceForwarder lo m a + -> Peer (TraceForward lo) 'AsServer 'StIdle m a +traceForwarderPeer TraceForwarder{recvMsgTraceObjectsRequest, recvMsgDone} = + -- In the 'StIdle' state the forwarder is awaiting a request message + -- from the acceptor. + Await (ClientAgency TokIdle) $ \case + -- The acceptor sent us a request for new 'TraceObject's, so now we're + -- in the 'StBusy' state which means it's the forwarder's turn to send + -- a reply. + MsgTraceObjectsRequest blocking request -> Effect $ do + (reply, next) <- recvMsgTraceObjectsRequest blocking request + return $ Yield (ServerAgency (TokBusy blocking)) + (MsgTraceObjectsReply reply) + (traceForwarderPeer next) + + -- The acceptor sent the done transition, so we're in the 'StDone' state + -- so all we can do is stop using 'done', with a return value. + MsgDone -> Effect $ Done TokDone <$> recvMsgDone diff --git a/datapoint-forward/src/DataPoint/Forward/Protocol/Type.hs b/datapoint-forward/src/DataPoint/Forward/Protocol/Type.hs new file mode 100644 index 00000000000..ee33f6fde56 --- /dev/null +++ b/datapoint-forward/src/DataPoint/Forward/Protocol/Type.hs @@ -0,0 +1,182 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE EmptyCase #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE PolyKinds #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE TypeFamilies #-} + +-- | The type of the trace forwarding/accepting protocol. +-- +-- Since we are using a typed protocol framework this is in some sense /the/ +-- definition of the protocol: what is allowed and what is not allowed. + +module DataPoint.Forward.Protocol.Type + ( TraceForward (..) + , TokBlockingStyle (..) + , Message (..) + , ClientHasAgency (..) + , ServerHasAgency (..) + , NobodyHasAgency (..) + , NumberOfTraceObjects (..) + , BlockingReplyList (..) + ) where + +import Codec.Serialise (Serialise (..)) +import Data.List.NonEmpty (NonEmpty) +import Data.Proxy (Proxy(..)) +import Data.Word (Word16) +import GHC.Generics (Generic) +import Network.TypedProtocol.Core (Protocol (..)) +import Ouroboros.Network.Util.ShowProxy (ShowProxy(..)) + +-- | A kind to identify our protocol, and the types of the states in the state +-- transition diagram of the protocol. +-- +-- IMPORTANT NOTE: the following terminology is used: +-- +-- 1. From the protocol's point of view, two peers talk to each other: +-- the forwarder and the acceptor. +-- 2. The forwarder is an application that collects 'TraceObject's and sends +-- them to the acceptor by request (with 'MsgTraceObjectsReply'). +-- 3. The acceptor is an application that receives 'TraceObject's from the +-- forwarder. +-- 4. You can think of the acceptor as a client, and the forwarder as a server. +-- After the connection is established, the acceptor asks for 'TraceObject's, +-- the forwarder replies to it. + +-- | The acceptor will send this request to the forwarder. +newtype NumberOfTraceObjects = NumberOfTraceObjects + { nTraceObjects :: Word16 + } deriving (Eq, Generic, Show) + +instance ShowProxy NumberOfTraceObjects +instance Serialise NumberOfTraceObjects + +data TraceForward lo where + + -- | Both acceptor and forwarder are in idle state. The acceptor can send a + -- request for node's info ('MsgNodeInfoRequest') OR a request for a list + -- of 'TraceObject's ('MsgTraceObjectsRequest'); the forwarder is waiting for a request. + -- It will replay either with 'MsgNodeInfoReply' or 'MsgTraceObjectsReply'. + -- + -- Node's info is an important information about the node, such as + -- its protocol, version, start time, etc. It is assuming that the node + -- must provide this information. + StIdle :: TraceForward lo + + -- | The acceptor has sent a next request for 'TraceObject's. The acceptor is + -- now waiting for a reply, and the forwarder is busy getting ready to send a + -- reply with new list of 'TraceObject's. + -- + -- There are two sub-states for this, for blocking and non-blocking cases. + StBusy :: StBlockingStyle -> TraceForward lo + + -- | Both the acceptor and forwarder are in the terminal state. They're done. + StDone :: TraceForward lo + +instance (ShowProxy lo) + => ShowProxy (TraceForward lo) where + showProxy _ = concat + [ "TraceForward (" + , showProxy (Proxy :: Proxy lo) + , ")" + ] + +data StBlockingStyle where + -- | In this sub-state the reply need not be prompt. There is no timeout. + StBlocking :: StBlockingStyle + -- | In this sub-state the peer must reply. There is a timeout. + StNonBlocking :: StBlockingStyle + +-- | The value level equivalent of 'StBlockingStyle'. +-- +-- This is also used in 'MsgTraceObjectsRequest' where it is interpreted (and can be encoded) +-- as a 'Bool' with 'True' for blocking, and 'False' for non-blocking. +data TokBlockingStyle (k :: StBlockingStyle) where + TokBlocking :: TokBlockingStyle 'StBlocking + TokNonBlocking :: TokBlockingStyle 'StNonBlocking + +deriving instance Eq (TokBlockingStyle b) +deriving instance Show (TokBlockingStyle b) + +-- | We have requests for lists of things. In the blocking case the +-- corresponding reply must be non-empty, whereas in the non-blocking case +-- an empty reply is fine. +-- +data BlockingReplyList (blocking :: StBlockingStyle) lo where + BlockingReply :: NonEmpty lo -> BlockingReplyList 'StBlocking lo + NonBlockingReply :: [lo] -> BlockingReplyList 'StNonBlocking lo + +deriving instance Eq lo => Eq (BlockingReplyList blocking lo) +deriving instance Show lo => Show (BlockingReplyList blocking lo) + +instance Protocol (TraceForward lo) where + + -- | The messages in the trace forwarding/accepting protocol. + -- + data Message (TraceForward lo) from to where + -- | Request the list of 'TraceObject's from the forwarder. + -- State: Idle -> Busy. + -- + -- With 'TokBlocking' this is a a blocking operation: the reply will + -- always have at least one 'TraceObject', and it does not expect a prompt + -- reply: there is no timeout. This covers the case when there + -- is nothing else to do but wait. + -- + -- With 'TokNonBlocking' this is a non-blocking operation: the reply + -- may be an empty list and this does expect a prompt reply. + MsgTraceObjectsRequest + :: TokBlockingStyle blocking + -> NumberOfTraceObjects + -> Message (TraceForward lo) 'StIdle ('StBusy blocking) + + -- | Reply with a list of 'TraceObject's for the acceptor. + -- State: Busy -> Idle. + MsgTraceObjectsReply + :: BlockingReplyList blocking lo + -> Message (TraceForward lo) ('StBusy blocking) 'StIdle + + -- | Terminating message. State: Idle -> Done. + MsgDone + :: Message (TraceForward lo) 'StIdle 'StDone + + -- | This is an explanation of our states, in terms of which party has agency + -- in each state. + -- + -- 1. When both peers are in Idle state, the acceptor can send a message + -- to the forwarder (request for new 'TraceObject's), + -- 2. When both peers are in Busy state, the forwarder is expected to send + -- a reply to the acceptor (list of new 'TraceObject's). + -- + -- So we assume that, from __interaction__ point of view: + -- 1. ClientHasAgency (from 'Network.TypedProtocol.Core') corresponds to acceptor's agency. + -- 3. ServerHasAgency (from 'Network.TypedProtocol.Core') corresponds to forwarder's agency. + -- + data ClientHasAgency st where + TokIdle :: ClientHasAgency 'StIdle + + data ServerHasAgency st where + TokBusy :: TokBlockingStyle blocking -> ServerHasAgency ('StBusy blocking) + + data NobodyHasAgency st where + TokDone :: NobodyHasAgency 'StDone + + -- | Impossible cases. + exclusionLemma_ClientAndServerHaveAgency TokIdle tok = case tok of {} + exclusionLemma_NobodyAndClientHaveAgency TokDone tok = case tok of {} + exclusionLemma_NobodyAndServerHaveAgency TokDone tok = case tok of {} + +instance Show lo + => Show (Message (TraceForward lo) from to) where + show MsgTraceObjectsRequest{} = "MsgTraceObjectsRequest" + show MsgTraceObjectsReply{} = "MsgTraceObjectsReply" + show MsgDone{} = "MsgDone" + +instance Show (ClientHasAgency (st :: TraceForward lo)) where + show TokIdle = "TokIdle" + +instance Show (ServerHasAgency (st :: TraceForward lo)) where + show TokBusy{} = "TokBusy" diff --git a/datapoint-forward/src/DataPoint/Forward/Queue.hs b/datapoint-forward/src/DataPoint/Forward/Queue.hs new file mode 100644 index 00000000000..fd67dbf33db --- /dev/null +++ b/datapoint-forward/src/DataPoint/Forward/Queue.hs @@ -0,0 +1,61 @@ +{-# LANGUAGE GADTs #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} + +module DataPoint.Forward.Queue + ( readItems + , getTraceObjects + ) where + +import Control.Concurrent.STM (STM, atomically, retry) +import Control.Concurrent.STM.TBQueue +import Control.Concurrent.STM.TVar +import Control.Monad (unless) +import qualified Data.List.NonEmpty as NE +import Data.Word (Word16) + +import DataPoint.Forward.Configuration (ForwarderConfiguration (..)) +import qualified DataPoint.Forward.Protocol.Forwarder as Forwarder +import DataPoint.Forward.Protocol.Type +import DataPoint.Forward.Utils + +readItems + :: ForwarderConfiguration lo -- ^ The forwarder configuration. + -> ForwardSink lo -- ^ The sink contains the queue we read 'TraceObject's from. + -> Forwarder.TraceForwarder lo IO () +readItems config sink@ForwardSink{forwardQueue, wasUsed} = + Forwarder.TraceForwarder + { Forwarder.recvMsgTraceObjectsRequest = \blocking (NumberOfTraceObjects n) -> do + replyList <- + case blocking of + TokBlocking -> do + objs <- atomically $ getNTraceObjects n forwardQueue >>= \case + [] -> retry -- No 'TraceObject's yet, just wait... + (x:xs) -> return $ x NE.:| xs + atomically . modifyTVar' wasUsed . const $ True + return $ BlockingReply objs + TokNonBlocking -> do + objs <- atomically $ getNTraceObjects n forwardQueue + unless (null objs) $ + atomically . modifyTVar' wasUsed . const $ True + return $ NonBlockingReply objs + return (replyList, readItems config sink) + , Forwarder.recvMsgDone = return () + } + +-- | Returns at most N 'TraceObject's from the queue. +getNTraceObjects + :: Word16 + -> TVar (TBQueue lo) + -> STM [lo] +getNTraceObjects 0 _ = return [] +getNTraceObjects n q = + readTVar q >>= tryReadTBQueue >>= \case + Just lo' -> (lo' :) <$> getNTraceObjects (n - 1) q + Nothing -> return [] + +getTraceObjects + :: BlockingReplyList blocking lo -- ^ The reply with list of 'TraceObject's. + -> [lo] +getTraceObjects (BlockingReply neList) = NE.toList neList +getTraceObjects (NonBlockingReply list) = list diff --git a/datapoint-forward/src/DataPoint/Forward/Utils.hs b/datapoint-forward/src/DataPoint/Forward/Utils.hs new file mode 100644 index 00000000000..61e9846767b --- /dev/null +++ b/datapoint-forward/src/DataPoint/Forward/Utils.hs @@ -0,0 +1,119 @@ +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} + +module DataPoint.Forward.Utils + ( ForwardSink (..) + , initForwardSink + , writeToSink + , runActionInLoop + ) where + +import Control.Concurrent.STM (atomically) +import Control.Concurrent.STM.TBQueue +import Control.Concurrent.STM.TVar +import Control.Exception (SomeAsyncException (..), fromException, tryJust) +import Control.Monad.Extra (whenM) +import Control.Tracer (showTracing, stdoutTracer, traceWith) +import System.IO +import System.Time.Extra (sleep) + +import DataPoint.Forward.Configuration + +-- | Run monadic action in a loop. If there's an exception, it will re-run +-- the action again, after pause that grows. +runActionInLoop + :: IO () + -> HowToConnect + -> Word + -> IO () +runActionInLoop action endpoint prevDelay = + tryJust excludeAsyncExceptions action >>= \case + Left e -> do + logTrace $ "trace-forward, connection with " <> show endpoint <> " failed: " <> show e + sleep $ fromIntegral currentDelay + runActionInLoop action endpoint currentDelay + Right _ -> return () + where + excludeAsyncExceptions e = + case fromException e of + Just SomeAsyncException {} -> Nothing + _ -> Just e + + logTrace = traceWith $ showTracing stdoutTracer + + currentDelay = + if prevDelay < 60 + then prevDelay * 2 + else 60 -- After we reached 60+ secs delay, repeat an attempt every minute. + +data ForwardSink lo = ForwardSink + { forwardQueue :: !(TVar (TBQueue lo)) + , disconnectedSize :: !Word + , connectedSize :: !Word + , wasUsed :: !(TVar Bool) + } + +initForwardSink + :: ForwarderConfiguration lo + -> IO (ForwardSink lo) +initForwardSink ForwarderConfiguration{disconnectedQueueSize, connectedQueueSize} = do + -- Initially we always create a big queue, because during node's start + -- the number of tracing items may be very big. + (queue, used) <- + atomically $ (,) <$> (newTVar =<< newTBQueue (fromIntegral disconnectedQueueSize)) + <*> newTVar False + return $ ForwardSink + { forwardQueue = queue + , disconnectedSize = disconnectedQueueSize + , connectedSize = connectedQueueSize + , wasUsed = used + } + +-- | There are 4 possible cases when we try to write tracing item: +-- 1. The queue is __still__ empty (no tracing items were writen in it). +-- 2. The queue is __already__ empty (all previously written items were taken from it). +-- 3. The queue is full. In this case flush all tracing items to stdout and continue. +-- 4. The queue isn't empty and isn't full. Just continue writing. +writeToSink + :: Show lo + => ForwardSink lo + -> lo + -> IO () +writeToSink ForwardSink{forwardQueue, disconnectedSize, connectedSize, wasUsed} traceObject = do + q <- readTVarIO forwardQueue + atomically ((,) <$> isFullTBQueue q + <*> isEmptyTBQueue q) >>= \case + (True, _) -> maybeFlushQueueToStdout q + (_, True) -> checkIfSinkWasUsed q + (_, _) -> return () + atomically $ readTVar forwardQueue >>= flip writeTBQueue traceObject + where + -- The queue is full, but if it's a small queue, we can switch it + -- to a big one and give a chance not to flush items to stdout yet. + maybeFlushQueueToStdout q = do + qLen <- atomically $ lengthTBQueue q + if fromIntegral qLen == connectedSize + then atomically $ do + -- The small queue is full, so we have to switch to a big one and + -- then flush collected items from the small queue and store them in + -- a big one. + acceptedItems <- flushTBQueue q + switchQueue disconnectedSize + bigQ <- readTVar forwardQueue + mapM_ (writeTBQueue bigQ) acceptedItems + else do + -- The big queue is full, we have to flush it to stdout. + atomically (flushTBQueue q) >>= mapM_ print + hFlush stdout + + checkIfSinkWasUsed q = atomically $ + whenM (readTVar wasUsed) $ switchToAnotherQueue q + + switchToAnotherQueue q = do + qLen <- lengthTBQueue q + if fromIntegral qLen == disconnectedSize + then switchQueue connectedSize + else switchQueue disconnectedSize + + switchQueue size = + newTBQueue (fromIntegral size) >>= modifyTVar' forwardQueue . const diff --git a/datapoint-forward/test/Main.hs b/datapoint-forward/test/Main.hs new file mode 100644 index 00000000000..d4839f03264 --- /dev/null +++ b/datapoint-forward/test/Main.hs @@ -0,0 +1,15 @@ +module Main (main) where + +import Test.Tasty + +import qualified Test.DataPoint.Forward.Protocol.Tests as Protocol +import qualified Test.DataPoint.Forward.Demo.Tests as Demo + +main :: IO () +main = defaultMain tests + +tests :: TestTree +tests = testGroup "trace-forward" + [ Protocol.tests + , Demo.tests + ] diff --git a/datapoint-forward/test/Test/DataPoint/Forward/Demo/Configs.hs b/datapoint-forward/test/Test/DataPoint/Forward/Demo/Configs.hs new file mode 100644 index 00000000000..a7551e7f0c8 --- /dev/null +++ b/datapoint-forward/test/Test/DataPoint/Forward/Demo/Configs.hs @@ -0,0 +1,37 @@ +module Test.DataPoint.Forward.Demo.Configs + ( mkAcceptorConfig + , mkForwarderConfig + ) where + +import Control.Tracer (nullTracer) +import GHC.Conc (TVar) + +import DataPoint.Forward.Configuration +import DataPoint.Forward.Protocol.Type + +import Test.DataPoint.Forward.Protocol.TraceItem + +mkAcceptorConfig + :: HowToConnect + -> TVar Bool + -> AcceptorConfiguration TraceItem +mkAcceptorConfig ep weAreDone = + AcceptorConfiguration + { acceptorTracer = nullTracer + , forwarderEndpoint = ep + , whatToRequest = NumberOfTraceObjects 10 + , shouldWeStop = weAreDone + } + +mkForwarderConfig + :: HowToConnect + -> Word + -> Word + -> ForwarderConfiguration TraceItem +mkForwarderConfig ep disconnectedSize connectedSize = + ForwarderConfiguration + { forwarderTracer = nullTracer + , acceptorEndpoint = ep + , disconnectedQueueSize = disconnectedSize + , connectedQueueSize = connectedSize + } diff --git a/datapoint-forward/test/Test/DataPoint/Forward/Demo/Tests.hs b/datapoint-forward/test/Test/DataPoint/Forward/Demo/Tests.hs new file mode 100644 index 00000000000..0bd4d256460 --- /dev/null +++ b/datapoint-forward/test/Test/DataPoint/Forward/Demo/Tests.hs @@ -0,0 +1,87 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Test.DataPoint.Forward.Demo.Tests + ( tests + ) where + +import Control.Concurrent.Async (withAsync) +import Data.Functor ((<&>)) +import Data.IORef (IORef, atomicModifyIORef', newIORef, readIORef) +import GHC.Conc +import System.Directory (getTemporaryDirectory) +#if defined(mingw32_HOST_OS) +import System.FilePath ((), dropDrive) +import qualified Data.Text as T +#else +import System.FilePath (()) +#endif +import Test.Tasty +import Test.Tasty.QuickCheck +import System.Time.Extra (sleep) + +import Ouroboros.Network.IOManager (withIOManager) + +import DataPoint.Forward.Acceptor +import DataPoint.Forward.Configuration +import DataPoint.Forward.Forwarder +import DataPoint.Forward.Utils + +import Test.DataPoint.Forward.Demo.Configs +import Test.DataPoint.Forward.Protocol.Codec () +import Test.DataPoint.Forward.Protocol.TraceItem + +tests :: TestTree +tests = localOption (QuickCheckTests 1) $ testGroup "DataPoint.Forward.Demo" + [ testProperty "LocalPipe" $ prop_RemoteSocket 200 + ] + +prop_RemoteSocket :: Int -> Property +prop_RemoteSocket n = ioProperty . withIOManager $ \iomgr -> do + ep <- LocalPipe <$> mkLocalPipePath + + acceptedItems :: IORef [TraceItem] <- newIORef [] + weAreDone <- newTVarIO False + let forwarderConfig = mkForwarderConfig ep (fromIntegral n) (fromIntegral n) + sink <- initForwardSink forwarderConfig + + itemsToForward <- generateNTraceItems n + + withAsync (runTraceAcceptor + iomgr + (mkAcceptorConfig ep weAreDone) + (traceItemsHandler acceptedItems)) $ \_ -> do + sleep 0.5 + withAsync (runTraceForwarder iomgr forwarderConfig sink) $ \_ -> do + mapM_ (writeToSink sink) itemsToForward + -- Just wait till the acceptor will ask and receive all 'TraceItem's from the forwarder. + waitForFinish acceptedItems n weAreDone + + -- Take accepted items and compare results. + acceptedItems' <- readIORef acceptedItems + return $ itemsToForward === acceptedItems' + +traceItemsHandler :: IORef [TraceItem] -> [TraceItem] -> IO () +traceItemsHandler acceptedItems' items = do + atomicModifyIORef' acceptedItems' $ \storedItems -> (storedItems ++ items, ()) + +generateNTraceItems :: Int -> IO [TraceItem] +generateNTraceItems n = generate (infiniteListOf arbitrary) <&> take n + +waitForFinish :: IORef [TraceItem] -> Int -> TVar Bool -> IO () +waitForFinish acceptedItems' n weAreDone' = do + items' <- readIORef acceptedItems' + if length items' < n + then sleep 0.001 >> waitForFinish acceptedItems' n weAreDone' + else atomically $ writeTVar weAreDone' True + +mkLocalPipePath :: IO FilePath +mkLocalPipePath = do + tmpDir <- getTemporaryDirectory +#if defined(mingw32_HOST_OS) + return $ "\\\\.\\pipe\\" <> (T.unpack . T.replace "\\" "-" . T.pack) (dropDrive tmpDir) + <> "_" "trace-forward-test" +#else + return $ tmpDir "trace-forward-test.sock" +#endif diff --git a/datapoint-forward/test/Test/DataPoint/Forward/Protocol/Codec.hs b/datapoint-forward/test/Test/DataPoint/Forward/Protocol/Codec.hs new file mode 100644 index 00000000000..6b978e76e2b --- /dev/null +++ b/datapoint-forward/test/Test/DataPoint/Forward/Protocol/Codec.hs @@ -0,0 +1,41 @@ +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE OverloadedStrings #-} + +module Test.DataPoint.Forward.Protocol.Codec () where + +import Data.Time.Calendar (fromGregorian) +import Data.Time.Clock (UTCTime (..)) +import Test.QuickCheck + +import Network.TypedProtocol.Core +import Network.TypedProtocol.Codec + +import DataPoint.Forward.Protocol.Type + +import Test.DataPoint.Forward.Protocol.TraceItem + +instance Arbitrary NumberOfTraceObjects where + arbitrary = NumberOfTraceObjects <$> arbitrary + +instance Arbitrary (AnyMessageAndAgency (TraceForward TraceItem)) where + arbitrary = oneof + [ AnyMessageAndAgency (ClientAgency TokIdle) . MsgTraceObjectsRequest TokBlocking <$> arbitrary + , AnyMessageAndAgency (ClientAgency TokIdle) . MsgTraceObjectsRequest TokNonBlocking <$> arbitrary + , AnyMessageAndAgency (ServerAgency (TokBusy TokBlocking)) . MsgTraceObjectsReply . BlockingReply <$> arbitrary + , AnyMessageAndAgency (ServerAgency (TokBusy TokNonBlocking)) . MsgTraceObjectsReply . NonBlockingReply <$> arbitrary + , pure $ AnyMessageAndAgency (ClientAgency TokIdle) MsgDone + ] + +instance Eq (AnyMessage (TraceForward TraceItem)) where + AnyMessage (MsgTraceObjectsRequest TokBlocking r1) + == AnyMessage (MsgTraceObjectsRequest TokBlocking r2) = r1 == r2 + AnyMessage (MsgTraceObjectsRequest TokNonBlocking r1) + == AnyMessage (MsgTraceObjectsRequest TokNonBlocking r2) = r1 == r2 + AnyMessage (MsgTraceObjectsReply (BlockingReply r1)) + == AnyMessage (MsgTraceObjectsReply (BlockingReply r2)) = r1 == r2 + AnyMessage (MsgTraceObjectsReply (NonBlockingReply r1)) + == AnyMessage (MsgTraceObjectsReply (NonBlockingReply r2)) = r1 == r2 + AnyMessage MsgDone + == AnyMessage MsgDone = True + _ == _ = False diff --git a/datapoint-forward/test/Test/DataPoint/Forward/Protocol/Tests.hs b/datapoint-forward/test/Test/DataPoint/Forward/Protocol/Tests.hs new file mode 100644 index 00000000000..3c339515119 --- /dev/null +++ b/datapoint-forward/test/Test/DataPoint/Forward/Protocol/Tests.hs @@ -0,0 +1,28 @@ +module Test.DataPoint.Forward.Protocol.Tests + ( tests + ) where + +import qualified Codec.Serialise as CBOR +import Control.Monad.ST (runST) +import Test.Tasty +import Test.Tasty.QuickCheck + +import Network.TypedProtocol.Codec + +import DataPoint.Forward.Protocol.Codec +import DataPoint.Forward.Protocol.Type + +import Test.DataPoint.Forward.Protocol.Codec () +import Test.DataPoint.Forward.Protocol.TraceItem + +tests :: TestTree +tests = testGroup "DataPoint.Forward.Protocol" + [ testProperty "codec" prop_codec_TraceForward + ] + +prop_codec_TraceForward :: AnyMessageAndAgency (TraceForward TraceItem) -> Bool +prop_codec_TraceForward msg = + runST $ prop_codecM + (codecTraceForward CBOR.encode CBOR.decode + CBOR.encode CBOR.decode) + msg diff --git a/datapoint-forward/test/Test/DataPoint/Forward/Protocol/TraceItem.hs b/datapoint-forward/test/Test/DataPoint/Forward/Protocol/TraceItem.hs new file mode 100644 index 00000000000..d550e671889 --- /dev/null +++ b/datapoint-forward/test/Test/DataPoint/Forward/Protocol/TraceItem.hs @@ -0,0 +1,74 @@ +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE OverloadedStrings #-} + +module Test.DataPoint.Forward.Protocol.TraceItem + ( TraceItem (..) + ) where + +import Codec.Serialise (Serialise (..)) +import Data.List.NonEmpty (NonEmpty, fromList) +import Data.Text (Text) +import Data.Time (UTCTime (..), fromGregorian) +import GHC.Generics +import Test.QuickCheck + +import Ouroboros.Network.Util.ShowProxy (ShowProxy(..)) + +data Severity = Debug | Info | Notice + deriving (Show, Eq, Ord, Enum, Generic) + +instance Arbitrary Severity where + arbitrary = oneof + [ pure Debug + , pure Info + , pure Notice + ] + +instance Serialise Severity + +data DetailLevel = Brief | Regular + deriving (Show, Eq, Ord, Enum, Generic) + +instance Arbitrary DetailLevel where + arbitrary = oneof + [ pure Brief + , pure Regular + ] + +instance Serialise DetailLevel + +instance Arbitrary UTCTime where + arbitrary = oneof + [ pure $ UTCTime (fromGregorian 2021 7 24) ((22 * 3600) + (15 * 60) + 1) + , pure $ UTCTime (fromGregorian 2021 7 25) ((12 * 3600) + (4 * 60) + 37) + , pure $ UTCTime (fromGregorian 2021 7 26) ((23 * 3600) + (19 * 60) + 56) + ] + +-- | Trace items that will be used during testing. +-- This type is similar to the real 'TraceObject' that will be used by the node. +data TraceItem = TraceItem + { tiHuman :: Maybe String + , tiNamespace :: [String] + , tiSeverity :: Severity + , tiDetails :: DetailLevel + , tiTimestamp :: UTCTime + , tiHostname :: String + , tiThreadId :: Text + } deriving (Eq, Ord, Show, Generic) + +instance Serialise TraceItem +instance ShowProxy TraceItem + +instance Arbitrary TraceItem where + arbitrary = TraceItem + <$> arbitrary + <*> arbitrary + <*> arbitrary + <*> arbitrary + <*> arbitrary + <*> arbitrary + <*> oneof [pure "1", pure "10"] + +instance Arbitrary (NonEmpty TraceItem) where + arbitrary = fromList <$> listOf1 arbitrary