diff --git a/cabal.project b/cabal.project index b1092acf95a..13c1ea3d16c 100644 --- a/cabal.project +++ b/cabal.project @@ -73,6 +73,9 @@ package cardano-submit-api package cardano-testnet tests: True +package cardano-tracer + tests: True + package plutus-examples tests: True @@ -273,8 +276,14 @@ source-repository-package source-repository-package type: git location: https://github.com/input-output-hk/ekg-forward - tag: d9e8fd302fa6ba41c07183d371e6777286d37bc2 - --sha256: 0s8cblhq3i528jj7r7yd4v82nqzafj8vrgf0y80l7saxc3a5f2lk + tag: b50b215651822fa0d5f5dd2fc28b57ca2b33baa4 + --sha256: 1dlknw83cmya9z8s2iwi35ymhv1yip5wvdh8p3jhmx9yv89apkq2 + +source-repository-package + type: git + location: https://github.com/HeinrichApfelmus/threepenny-gui + tag: e3bb8283fc7d2e8aa374eea29426002e8dcd67a8 + --sha256: 0nf836b552asgpwn2gxwl7yd7ssdhb1wkvdqz6s4dpzqnlpyivx9 source-repository-package type: git diff --git a/cardano-tracer/CODEOWNERS b/cardano-tracer/CODEOWNERS new file mode 100644 index 00000000000..84587676f58 --- /dev/null +++ b/cardano-tracer/CODEOWNERS @@ -0,0 +1,3 @@ +# General reviewers per PR +# Denis Serge +* @denisshevchenko @deepfire diff --git a/cardano-tracer/README.md b/cardano-tracer/README.md index e9dc8b160e4..22b5bde5e51 100644 --- a/cardano-tracer/README.md +++ b/cardano-tracer/README.md @@ -1,5 +1,5 @@ -# cardano-tracer +# Cardano Tracer -`cardano-tracer` is an application for logging and monitoring over `cardano-node`. After it is connected to the node, it periodically asks the node for different information, receives it, and handles it. +`cardano-tracer` is a service for logging and monitoring over `cardano-node`. After it is connected to the node, it periodically asks the node for different information, receives it, and handles it. -For more details please [read the documentation](https://github.com/input-output-hk/cardano-node/blob/tracing-master/cardano-tracer/docs/cardano-tracer.md). +For more details please [read the documentation](https://github.com/input-output-hk/cardano-node/blob/master/cardano-tracer/docs/cardano-tracer.md). diff --git a/cardano-tracer/cardano-tracer.cabal b/cardano-tracer/cardano-tracer.cabal index a2f4edf2b3b..0c0d1200bc7 100644 --- a/cardano-tracer/cardano-tracer.cabal +++ b/cardano-tracer/cardano-tracer.cabal @@ -1,7 +1,7 @@ cabal-version: 2.4 name: cardano-tracer version: 0.1.0 -synopsis: See README for more info +synopsis: A service for logging and monitoring over Cardano node. description: See README for more info license: Apache-2.0 license-file: LICENSE @@ -55,6 +55,7 @@ library build-depends: aeson , async + , async-extras , blaze-html , blaze-markup , bytestring @@ -74,6 +75,7 @@ library , snap-server , stm , text + , threepenny-gui , time , trace-dispatcher , trace-forward @@ -158,7 +160,6 @@ test-suite cardano-tracer-test , filepath , ouroboros-network-framework , QuickCheck - , stm , tasty , tasty-quickcheck , text diff --git a/cardano-tracer/configuration/complete-example.json b/cardano-tracer/configuration/complete-example.json new file mode 100644 index 00000000000..7808727f8fc --- /dev/null +++ b/cardano-tracer/configuration/complete-example.json @@ -0,0 +1,39 @@ +{ + "network": { + "tag": "AcceptAt", + "contents": "/tmp/forwarder.sock" + }, + "loRequestNum": 100, + "ekgRequestFreq": 2, + "hasEKG": [ + { + "epHost": "127.0.0.1", + "epPort": 3100 + }, + { + "epHost": "127.0.0.1", + "epPort": 3101 + } + ], + "hasPrometheus": { + "epHost": "127.0.0.1", + "epPort": 3000 + }, + "logging": [ + { + "logRoot": "/tmp/cardano-tracer-h-logs", + "logMode": "FileMode", + "logFormat": "ForHuman" + }, + { + "logRoot": "/tmp/cardano-tracer-m-logs", + "logMode": "FileMode", + "logFormat": "ForMachine" + } + ], + "rotation": { + "rpKeepFilesNum": 1, + "rpLogLimitBytes": 50000, + "rpMaxAgeHours": 1 + } +} diff --git a/cardano-tracer/configuration/example.json b/cardano-tracer/configuration/minimal-example.json similarity index 60% rename from cardano-tracer/configuration/example.json rename to cardano-tracer/configuration/minimal-example.json index 6b33c0ca241..97f95ba1049 100644 --- a/cardano-tracer/configuration/example.json +++ b/cardano-tracer/configuration/minimal-example.json @@ -3,18 +3,11 @@ "tag": "ConnectTo", "contents": ["/tmp/forwarder.sock"] }, - "hasEKG": null, - "hasPrometheus": null, "logging": [ { "logRoot": "/tmp/cardano-tracer-logs", "logMode": "FileMode", "logFormat": "ForMachine" } - ], - "rotation": { - "rpKeepFilesNum": 1, - "rpLogLimitBytes": 50000, - "rpMaxAgeHours": 1 - } + ] } diff --git a/cardano-tracer/docs/cardano-tracer.md b/cardano-tracer/docs/cardano-tracer.md index 4f74296c803..3a7efbf2e0f 100644 --- a/cardano-tracer/docs/cardano-tracer.md +++ b/cardano-tracer/docs/cardano-tracer.md @@ -1,6 +1,6 @@ -# cardano-tracer +# Cardano Tracer -`cardano-tracer` is an application for logging and monitoring over `cardano-node`. After it is connected to the node, it periodically asks the node for different information, receives it, and handles it. +`cardano-tracer` is a service for logging and monitoring over `cardano-node`. After it is connected to the node, it periodically asks the node for different information, receives it, and handles it. # Contents @@ -20,18 +20,19 @@ ## Motivation -Previously, `cardano-node` handled all the logging by itself. Moreover, it provided monitoring tools as well, for example, a web page for EKG metrics. `cardano-tracer` is an attempt to _move_ all logging/monitoring-related stuff from the node to a separate application. As a result, the node will be smaller, faster, and simpler. +Previously, `cardano-node` handled all the logging by itself. Moreover, it provided monitoring tools as well, for example, a web page for EKG metrics. `cardano-tracer` is a result of _moving_ all the logging/monitoring-related stuff from the node to a separate service. As a result, the node became smaller, faster, and simpler. ## Overview You can think of `cardano-node` as a **producer** of logging/monitoring information, and `cardano-tracer` as a **consumer** of this information. And after the network connection between them is established, `cardano-tracer` periodically asks for logging/monitoring information, and `cardano-node` replies with it. -Currently, this information is presented in two items: +There are three kinds of such an information: -1. Trace object, which contains arbitrary information from the node. For more details, please see `Cardano.Logging.Types.TraceObject` from `trace-dispatcher` library. -2. EKG metric, which contains some system metric. Please [read the documentation](https://hackage.haskell.org/package/ekg-core) for more info. +1. Trace object, which contains different logging data. `cardano-tracer` asks for new trace objects each `N` seconds, receives them and stores them in the log files and/or in Linux journal. +2. EKG metric, which contains some system metric. Please [read EKG documentation](https://hackage.haskell.org/package/ekg-core) for more info. `cardano-tracer` asks for new EKG metrics each `N` seconds, receives them and displays them using monitoring tools. +3. Data points, which contains arbitrary information about the node. `cardano-tracer` asks for new data points only by _explicit_ request, there is no "stream of data points". -Please note that `cardano-tracer` can work as an aggregator as well: _one_ `cardano-tracer` process can receive the information from _multiple_ `cardano-node` processes. In this case, received logging information will be stored in subdirectories: each subdirectory will contain all the items received from the particular node. +Please note that `cardano-tracer` can work as an aggregator as well: _one_ `cardano-tracer` process can receive the information from _multiple_ nodes. # Build and run @@ -55,7 +56,7 @@ Then you can run `cardano-tracer` using the following command: cabal exec -- cardano-tracer --config /path/to/your/config.json ``` -Please see below an explanation about the configuration file. +Please see below an explanation about the configuration file. You can find an example of the configuration file in `configuration` subdirectory. # Configuration @@ -64,9 +65,11 @@ The way how to configure `cardano-tracer` is depending on your requirements. The 1. **Distributed** scenario, when `cardano-tracer` is working on one machine, and your nodes are working on another machine(s). 2. **Local** scenario, when `cardano-tracer` and your nodes are working on the same machine. -Distributed scenario is for real-world case: for example, you have a cluster from `N` nodes working on `N` different AWS-instances, and you want to collect all the logging/monitoring information from these nodes using one single `cardano-tracer` process working on your machine. So, by default you should consider using distributed scenario. +Distributed scenario is for real-world case: for example, you have a cluster from `N` nodes working on `N` different AWS-instances, and you want to collect all the logging/monitoring information from these nodes using one `cardano-tracer` process working on your machine. So, by default you should consider using distributed scenario. -Local scenario is for testing case: for example, you want to try your new infrastructure from scratch, so you run `N` nodes and one `cardano-tracer` process on your laptop. +Local scenario is for testing case: for example, you want to try your new infrastructure from scratch, so you run `N` nodes and one `cardano-tracer` process on your machine. + +**IMPORTANT NOTICE**: Please note that `cardano-tracer` **does not** support connection via IP-address and port, to avoid unauthorized connections. The only way to establish connection with the node is a local socket. ## Distributed Scenario @@ -87,7 +90,7 @@ machine A machine B machine C machine D ``` -The minimalistic configuration file for `cardano-tracer` in such a scenario would be: +The minimalistic configuration file for the tracer would be: ``` { @@ -105,10 +108,31 @@ The minimalistic configuration file for `cardano-tracer` in such a scenario woul } ``` -The `network` field specifies the way how `cardano-tracer` will be connected to your nodes. Here you see `AcceptAt` tag, which means that `cardano-tracer` works as a server: it _accepts_ network connections by listening the local Unix socket `/tmp/cardano-tracer.sock`. But if `cardano-tracer` _accepts_ the connections - who should _initiate_ them? The node cannot do that, because it listens its local Unix socket too. To do that, we use SSH forwarding. Please note that `cardano-tracer` **does not** support connection via IP-address and port, to avoid unauthorized connections, that's why we need `ssh` for distributed scenario. +The `network` field specifies the way how `cardano-tracer` will be connected with your nodes. Here you see `AcceptAt` tag, which means that `cardano-tracer` works as a server: it _accepts_ network connections by listening the local socket `/tmp/cardano-tracer.sock`. And your nodes work as clients: they _initiate_ network connections using their local sockets. It can be shown like this: -It can be shown like this: +``` +machine A machine B machine C ++----------------------+ +----------------------+ +----------------------+ +| cardano-node | | cardano-node | | cardano-node | +| \ | | \ | | \ | +| v | | v | | v | +| local socket | | local socket | | local socket | ++----------------------+ +----------------------+ +----------------------+ + + + + + + +----------------------+ + | local socket | + | ^ | + | \ | + | cardano-tracer | + +----------------------+ + machine D +``` +To establish the real network connections between your machines, you need SSH forwarding: ``` machine A machine B machine C @@ -127,13 +151,21 @@ machine A machine B machine C | local socket | | ^ | | \ | - | \ | | cardano-tracer | +----------------------+ machine D ``` -In this case, `ssh` connects each node with the same `cardano-tracer`. The idea of SSH forwarding is simple: we connect not the processes directly, but their network endpoints instead. You can think of it as a network channel from the local socket on one machine to the local socket on another machine. So, to connect `cardano-node` working on machine `A` with `cardano-tracer` working on machine `D`, run this command on machine `A`: +The idea of SSH forwarding is simple: we connect not the processes directly, but their network endpoints instead. You can think of it as a network channel from the local socket on one machine to the local socket on another machine: + +``` +machine A machine D ++----------------------------------+ +------------------------------------+ +| cardano-node ---> local socket <-|---SSH channel---|-> local socket <--- cardano-tracer | ++----------------------------------+ +------------------------------------+ +``` + +So, to connect `cardano-node` working on machine `A` with `cardano-tracer` working on machine `D`, run this command on machine `A`: ``` ssh -nNT -L /tmp/cardano-tracer.sock:/tmp/cardano-node.sock -o "ExitOnForwardFailure yes" john@109.75.33.121 @@ -141,8 +173,8 @@ ssh -nNT -L /tmp/cardano-tracer.sock:/tmp/cardano-node.sock -o "ExitOnForwardFai where: -- `/tmp/cardano-tracer.sock` is a path to the local Unix socket on machine `A`, -- `/tmp/cardano-node.sock` is a path to the local Unix socket on machine `D`, +- `/tmp/cardano-tracer.sock` is a path to the local socket on machine `A`, +- `/tmp/cardano-node.sock` is a path to the local socket on machine `D`, - `john` is a user you use to login on machine `D`, - `109.75.33.121` is an IP-adress of machine `D`. @@ -150,7 +182,7 @@ Run the same command on machines `B` and `C` to connect corresponding nodes with ## Local Scenario -As was mentioned above, local scenario is for testing, when your nodes and `cardano-tracer` are working on the same machine. In this case all these processes can see the same local sockets directly, so we don't need `ssh`. The configuration file for the local cluster from 3 nodes will look like this this: +As was mentioned above, local scenario is for testing only, when your nodes and `cardano-tracer` are working on the same machine. In this case all these processes can see the same local sockets directly, so we don't need `ssh`. The configuration file for the local cluster from 3 nodes would look like this: ``` { @@ -172,7 +204,7 @@ As was mentioned above, local scenario is for testing, when your nodes and `card } ``` -As you see, the tag is `ConnectTo` now, which means that `cardano-tracer` works as a client: it _establishes_ network connections with the node via the local Unix sockets `/tmp/cardano-node-*.sock`. Please make sure your local nodes are listening corresponding sockets. +As you see, the tag is `ConnectTo` now, which means that `cardano-tracer` works as a client: it _establishes_ network connections with the node via the local sockets `/tmp/cardano-node-*.sock`. Please make sure your local nodes are listening corresponding sockets. Let's explore other fields of the configuration file. diff --git a/cardano-tracer/src/Cardano/Tracer/Acceptors/Client.hs b/cardano-tracer/src/Cardano/Tracer/Acceptors/Client.hs index 1cba50b3de4..61f9227189d 100644 --- a/cardano-tracer/src/Cardano/Tracer/Acceptors/Client.hs +++ b/cardano-tracer/src/Cardano/Tracer/Acceptors/Client.hs @@ -7,26 +7,24 @@ module Cardano.Tracer.Acceptors.Client import Codec.CBOR.Term (Term) import qualified Data.ByteString.Lazy as LBS import Data.Void (Void) -import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolLimits (..), - MiniProtocolNum (..), MuxMode (..), - OuroborosApplication (..), - RunMiniProtocol (..), - miniProtocolLimits, miniProtocolNum, miniProtocolRun) +import Ouroboros.Network.Mux ( + MiniProtocol (..), MiniProtocolLimits (..), + MiniProtocolNum (..), MuxMode (..), + OuroborosApplication (..), RunMiniProtocol (..), + miniProtocolLimits, miniProtocolNum, miniProtocolRun) import Ouroboros.Network.Driver.Limits (ProtocolTimeLimits) import Ouroboros.Network.IOManager (withIOManager) import Ouroboros.Network.Snocket (Snocket, localAddressFromPath, localSnocket) -import Ouroboros.Network.Socket (ConnectionId (..), connectToNode, - nullNetworkConnectTracers) -import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec, - noTimeLimitsHandshake) -import Ouroboros.Network.Protocol.Handshake.Unversioned (UnversionedProtocol (..), - UnversionedProtocolData (..), - unversionedHandshakeCodec, - unversionedProtocolDataCodec) +import Ouroboros.Network.Socket ( + ConnectionId (..), connectToNode, nullNetworkConnectTracers) +import Ouroboros.Network.Protocol.Handshake.Codec ( + cborTermVersionDataCodec, noTimeLimitsHandshake) +import Ouroboros.Network.Protocol.Handshake.Unversioned ( + UnversionedProtocol (..), UnversionedProtocolData (..), + unversionedHandshakeCodec, unversionedProtocolDataCodec) import Ouroboros.Network.Protocol.Handshake.Type (Handshake) -import Ouroboros.Network.Protocol.Handshake.Version (acceptableVersion, - simpleSingletonVersions) -import System.IO.Unsafe (unsafePerformIO) +import Ouroboros.Network.Protocol.Handshake.Version ( + acceptableVersion, simpleSingletonVersions) import Cardano.Logging (TraceObject) @@ -42,6 +40,7 @@ import Cardano.Tracer.Acceptors.Utils import Cardano.Tracer.Configuration import Cardano.Tracer.Handlers.Logs.TraceObjects import Cardano.Tracer.Types +import Cardano.Tracer.Utils runAcceptorsClient :: TracerConfig @@ -50,18 +49,18 @@ runAcceptorsClient , TF.AcceptorConfiguration TraceObject , DPF.AcceptorConfiguration ) + -> ConnectedNodes -> AcceptedMetrics - -> AcceptedNodeInfo -> DataPointAskers -> IO () -runAcceptorsClient config p (ekgConfig, tfConfig, dpfConfig) acceptedMetrics acceptedNodeInfo dpAskers = - withIOManager $ \iocp -> - doConnectToForwarder (localSnocket iocp) (localAddressFromPath p) noTimeLimitsHandshake $ - appInitiator - [ (runEKGAcceptorInit ekgConfig acceptedMetrics, 1) - , (runTraceObjectsAcceptorInit config tfConfig acceptedNodeInfo, 2) - , (runDataPointsAcceptorInit dpfConfig dpAskers, 3) - ] +runAcceptorsClient config p (ekgConfig, tfConfig, dpfConfig) + connectedNodes acceptedMetrics dpAskers = withIOManager $ \iocp -> + doConnectToForwarder (localSnocket iocp) (localAddressFromPath p) noTimeLimitsHandshake $ + appInitiator + [ (runEKGAcceptorInit ekgConfig connectedNodes acceptedMetrics, 1) + , (runTraceObjectsAcceptorInit config tfConfig connectedNodes, 2) + , (runDataPointsAcceptorInit dpfConfig connectedNodes dpAskers, 3) + ] where appInitiator protocols = OuroborosApplication $ \connectionId _shouldStopSTM -> @@ -97,28 +96,38 @@ doConnectToForwarder snocket address timeLimits app = runEKGAcceptorInit :: Show addr => EKGF.AcceptorConfiguration + -> ConnectedNodes -> AcceptedMetrics -> ConnectionId addr -> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void -runEKGAcceptorInit ekgConfig acceptedMetrics connId = do - let (ekgStore, localStore) = unsafePerformIO $ prepareMetricsStores acceptedMetrics connId - acceptEKGMetricsInit ekgConfig ekgStore localStore +runEKGAcceptorInit ekgConfig connectedNodes acceptedMetrics connId = + acceptEKGMetricsInit + ekgConfig + (prepareMetricsStores connectedNodes acceptedMetrics connId) + (handlePeerError connectedNodes connId) runTraceObjectsAcceptorInit :: Show addr => TracerConfig -> TF.AcceptorConfiguration TraceObject - -> AcceptedNodeInfo + -> ConnectedNodes -> ConnectionId addr -> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void -runTraceObjectsAcceptorInit config tfConfig acceptedNodeInfo connId = - acceptTraceObjectsInit tfConfig $ traceObjectsHandler config (connIdToNodeId connId) acceptedNodeInfo +runTraceObjectsAcceptorInit config tfConfig connectedNodes connId = + acceptTraceObjectsInit + tfConfig + (traceObjectsHandler config (connIdToNodeId connId)) + (handlePeerError connectedNodes connId) runDataPointsAcceptorInit :: Show addr => DPF.AcceptorConfiguration + -> ConnectedNodes -> DataPointAskers -> ConnectionId addr -> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void -runDataPointsAcceptorInit dpfConfig dpAskers connId = - acceptDataPointsInit dpfConfig $ prepareDataPointAsker dpAskers connId +runDataPointsAcceptorInit dpfConfig connectedNodes dpAskers connId = + acceptDataPointsInit + dpfConfig + (prepareDataPointAsker connectedNodes dpAskers connId) + (handlePeerError connectedNodes connId) diff --git a/cardano-tracer/src/Cardano/Tracer/Acceptors/Run.hs b/cardano-tracer/src/Cardano/Tracer/Acceptors/Run.hs index f975f5ff46e..bd9d3e0e7f9 100644 --- a/cardano-tracer/src/Cardano/Tracer/Acceptors/Run.hs +++ b/cardano-tracer/src/Cardano/Tracer/Acceptors/Run.hs @@ -3,100 +3,64 @@ module Cardano.Tracer.Acceptors.Run ( runAcceptors - , runAcceptorsWithBrake ) where import Control.Concurrent.Async (forConcurrently_) -import Control.Concurrent.STM.TVar (TVar, newTVarIO) import "contra-tracer" Control.Tracer (nullTracer) import qualified Data.List.NonEmpty as NE import Data.Maybe (fromMaybe) import Data.Time.Clock (secondsToNominalDiffTime) -import Cardano.Logging (TraceObject) - import qualified Trace.Forward.Configuration.DataPoint as DPF -import qualified Trace.Forward.Configuration.TraceObject as TF -import qualified Trace.Forward.Protocol.TraceObject.Type as TF +import qualified Trace.Forward.Configuration.TraceObject as TOF +import qualified Trace.Forward.Protocol.TraceObject.Type as TOF import qualified System.Metrics.Configuration as EKGF import qualified System.Metrics.ReqResp as EKGF -import Cardano.Tracer.Acceptors.Client (runAcceptorsClient) -import Cardano.Tracer.Acceptors.Server (runAcceptorsServer) +import Cardano.Tracer.Acceptors.Client +import Cardano.Tracer.Acceptors.Server import Cardano.Tracer.Configuration import Cardano.Tracer.Types -import Cardano.Tracer.Utils (runInLoop) +import Cardano.Tracer.Utils runAcceptors :: TracerConfig + -> ConnectedNodes -> AcceptedMetrics - -> AcceptedNodeInfo - -> DataPointAskers - -> IO () -runAcceptors config@TracerConfig{network} acceptedMetrics acceptedNodeInfo dpAskers = - case network of - AcceptAt (LocalSocket p) -> do - stopProtocols <- newTVarIO False - let configs = mkAcceptorsConfigs config p stopProtocols - runInLoop - (runAcceptorsServer config p configs acceptedMetrics acceptedNodeInfo dpAskers) - p 1 - ConnectTo localSocks -> - forConcurrently_ (NE.nub localSocks) $ \(LocalSocket p) -> do - stopProtocols <- newTVarIO False - let configs = mkAcceptorsConfigs config p stopProtocols - runInLoop - (runAcceptorsClient config p configs acceptedMetrics acceptedNodeInfo dpAskers) - p 1 - -runAcceptorsWithBrake - :: TracerConfig - -> AcceptedMetrics - -> AcceptedNodeInfo -> DataPointAskers - -> TVar Bool + -> ProtocolsBrake -> IO () -runAcceptorsWithBrake config@TracerConfig{network} - acceptedMetrics acceptedNodeInfo dpAskers stopProtocols = +runAcceptors c@TracerConfig{network, ekgRequestFreq, loRequestNum} + connectedNodes acceptedMetrics dpAskers stopIt = case network of - AcceptAt (LocalSocket p) -> do - let configs = mkAcceptorsConfigs config p stopProtocols + AcceptAt (LocalSocket p) -> runInLoop - (runAcceptorsServer config p configs acceptedMetrics acceptedNodeInfo dpAskers) + (runAcceptorsServer c p (acceptorsConfigs p) connectedNodes acceptedMetrics dpAskers) p 1 ConnectTo localSocks -> - forConcurrently_ (NE.nub localSocks) $ \(LocalSocket p) -> do - let configs = mkAcceptorsConfigs config p stopProtocols + forConcurrently_ (NE.nub localSocks) $ \(LocalSocket p) -> runInLoop - (runAcceptorsClient config p configs acceptedMetrics acceptedNodeInfo dpAskers) + (runAcceptorsClient c p (acceptorsConfigs p) connectedNodes acceptedMetrics dpAskers) p 1 - -mkAcceptorsConfigs - :: TracerConfig - -> FilePath - -> TVar Bool - -> ( EKGF.AcceptorConfiguration - , TF.AcceptorConfiguration TraceObject - , DPF.AcceptorConfiguration - ) -mkAcceptorsConfigs TracerConfig{ekgRequestFreq, loRequestNum} p stopProtocols = - ( EKGF.AcceptorConfiguration - { EKGF.acceptorTracer = nullTracer - , EKGF.forwarderEndpoint = EKGF.LocalPipe p - , EKGF.requestFrequency = secondsToNominalDiffTime $ fromMaybe 1.0 ekgRequestFreq - , EKGF.whatToRequest = EKGF.GetAllMetrics - , EKGF.shouldWeStop = stopProtocols - } - , TF.AcceptorConfiguration - { TF.acceptorTracer = nullTracer - , TF.forwarderEndpoint = p - , TF.whatToRequest = TF.NumberOfTraceObjects $ fromMaybe 100 loRequestNum - , TF.shouldWeStop = stopProtocols - } - , DPF.AcceptorConfiguration - { DPF.acceptorTracer = nullTracer - , DPF.forwarderEndpoint = p - , DPF.shouldWeStop = stopProtocols - } - ) + where + acceptorsConfigs p = + ( EKGF.AcceptorConfiguration + { EKGF.acceptorTracer = nullTracer + , EKGF.forwarderEndpoint = EKGF.LocalPipe p + , EKGF.requestFrequency = secondsToNominalDiffTime $ fromMaybe 1.0 ekgRequestFreq + , EKGF.whatToRequest = EKGF.GetAllMetrics + , EKGF.shouldWeStop = stopIt + } + , TOF.AcceptorConfiguration + { TOF.acceptorTracer = nullTracer + , TOF.forwarderEndpoint = p + , TOF.whatToRequest = TOF.NumberOfTraceObjects $ fromMaybe 100 loRequestNum + , TOF.shouldWeStop = stopIt + } + , DPF.AcceptorConfiguration + { DPF.acceptorTracer = nullTracer + , DPF.forwarderEndpoint = p + , DPF.shouldWeStop = stopIt + } + ) diff --git a/cardano-tracer/src/Cardano/Tracer/Acceptors/Server.hs b/cardano-tracer/src/Cardano/Tracer/Acceptors/Server.hs index 845d9948f65..7657f49d7b3 100644 --- a/cardano-tracer/src/Cardano/Tracer/Acceptors/Server.hs +++ b/cardano-tracer/src/Cardano/Tracer/Acceptors/Server.hs @@ -31,7 +31,6 @@ import Ouroboros.Network.Protocol.Handshake.Unversioned (UnversionedPr import Ouroboros.Network.Protocol.Handshake.Type (Handshake) import Ouroboros.Network.Protocol.Handshake.Version (acceptableVersion, simpleSingletonVersions) -import System.IO.Unsafe (unsafePerformIO) import Cardano.Logging (TraceObject) @@ -41,12 +40,13 @@ import Trace.Forward.Run.DataPoint.Acceptor (acceptDataPointsResp) import Trace.Forward.Run.TraceObject.Acceptor (acceptTraceObjectsResp) import qualified System.Metrics.Configuration as EKGF -import System.Metrics.Network.Acceptor (acceptEKGMetrics) +import System.Metrics.Network.Acceptor (acceptEKGMetricsResp) import Cardano.Tracer.Acceptors.Utils import Cardano.Tracer.Configuration import Cardano.Tracer.Handlers.Logs.TraceObjects import Cardano.Tracer.Types +import Cardano.Tracer.Utils runAcceptorsServer :: TracerConfig @@ -55,18 +55,18 @@ runAcceptorsServer , TF.AcceptorConfiguration TraceObject , DPF.AcceptorConfiguration ) + -> ConnectedNodes -> AcceptedMetrics - -> AcceptedNodeInfo -> DataPointAskers -> IO () -runAcceptorsServer config p (ekgConfig, tfConfig, dpfConfig) acceptedMetrics acceptedNodeInfo dpAskers = - withIOManager $ \iocp -> do - doListenToForwarder (localSnocket iocp) (localAddressFromPath p) noTimeLimitsHandshake $ - appResponder - [ (runEKGAcceptor ekgConfig acceptedMetrics, 1) - , (runTraceObjectsAcceptor config tfConfig acceptedNodeInfo, 2) - , (runDataPointsAcceptor dpfConfig dpAskers, 3) - ] +runAcceptorsServer config p (ekgConfig, tfConfig, dpfConfig) + connectedNodes acceptedMetrics dpAskers = withIOManager $ \iocp -> + doListenToForwarder (localSnocket iocp) (localAddressFromPath p) noTimeLimitsHandshake $ + appResponder + [ (runEKGAcceptor ekgConfig connectedNodes acceptedMetrics, 1) + , (runTraceObjectsAcceptor config tfConfig connectedNodes, 2) + , (runDataPointsAcceptor dpfConfig connectedNodes dpAskers, 3) + ] where appResponder protocols = OuroborosApplication $ \connectionId _shouldStopSTM -> @@ -104,34 +104,43 @@ doListenToForwarder snocket address timeLimits app = do (SomeResponderApplication app) ) nullErrorPolicies - $ \_ serverAsync -> do - wait serverAsync -- Block until async exception. + $ \_ serverAsync -> wait serverAsync -- Block until async exception. runEKGAcceptor :: Show addr => EKGF.AcceptorConfiguration + -> ConnectedNodes -> AcceptedMetrics -> ConnectionId addr -> RunMiniProtocol 'ResponderMode LBS.ByteString IO Void () -runEKGAcceptor ekgConfig acceptedMetrics connId = do - let (ekgStore, localStore) = unsafePerformIO $ prepareMetricsStores acceptedMetrics connId - acceptEKGMetrics ekgConfig ekgStore localStore +runEKGAcceptor ekgConfig connectedNodes acceptedMetrics connId = + acceptEKGMetricsResp + ekgConfig + (prepareMetricsStores connectedNodes acceptedMetrics connId) + (handlePeerError connectedNodes connId) runTraceObjectsAcceptor :: Show addr => TracerConfig -> TF.AcceptorConfiguration TraceObject - -> AcceptedNodeInfo + -> ConnectedNodes -> ConnectionId addr -> RunMiniProtocol 'ResponderMode LBS.ByteString IO Void () -runTraceObjectsAcceptor config tfConfig acceptedNodeInfo connId = - acceptTraceObjectsResp tfConfig $ traceObjectsHandler config (connIdToNodeId connId) acceptedNodeInfo +runTraceObjectsAcceptor config tfConfig connectedNodes connId = + acceptTraceObjectsResp + tfConfig + (traceObjectsHandler config (connIdToNodeId connId)) + (handlePeerError connectedNodes connId) runDataPointsAcceptor :: Show addr => DPF.AcceptorConfiguration + -> ConnectedNodes -> DataPointAskers -> ConnectionId addr -> RunMiniProtocol 'ResponderMode LBS.ByteString IO Void () -runDataPointsAcceptor dpfConfig dpAskers connId = - acceptDataPointsResp dpfConfig $ prepareDataPointAsker dpAskers connId +runDataPointsAcceptor dpfConfig connectedNodes dpAskers connId = + acceptDataPointsResp + dpfConfig + (prepareDataPointAsker connectedNodes dpAskers connId) + (handlePeerError connectedNodes connId) diff --git a/cardano-tracer/src/Cardano/Tracer/Acceptors/Utils.hs b/cardano-tracer/src/Cardano/Tracer/Acceptors/Utils.hs index 3a780764716..df96c7fca6c 100644 --- a/cardano-tracer/src/Cardano/Tracer/Acceptors/Utils.hs +++ b/cardano-tracer/src/Cardano/Tracer/Acceptors/Utils.hs @@ -1,12 +1,15 @@ module Cardano.Tracer.Acceptors.Utils ( prepareDataPointAsker , prepareMetricsStores + , handlePeerError ) where import Control.Concurrent.STM (atomically) -import Control.Concurrent.STM.TVar (TVar, newTVarIO, modifyTVar', readTVarIO) -import Control.Monad (unless) +import Control.Concurrent.STM.TVar (TVar, newTVarIO, modifyTVar', + readTVar, readTVarIO) +import Control.Monad (unless, when) import qualified Data.Map.Strict as M +import qualified Data.Set as S import qualified System.Metrics as EKG import Ouroboros.Network.Socket (ConnectionId (..)) @@ -16,13 +19,16 @@ import System.Metrics.Store.Acceptor (MetricsLocalStore, emptyMetricsL import Trace.Forward.Utils.DataPoint (DataPointAsker, initDataPointAsker) import Cardano.Tracer.Types +import Cardano.Tracer.Utils prepareDataPointAsker :: Show addr - => DataPointAskers + => ConnectedNodes + -> DataPointAskers -> ConnectionId addr -> IO DataPointAsker -prepareDataPointAsker dpAskers connId = do +prepareDataPointAsker connectedNodes dpAskers connId = do + addNode connectedNodes connId let nodeId = connIdToNodeId connId dpAsker <- initDataPointAsker atomically $ modifyTVar' dpAskers $ \askers -> @@ -33,10 +39,12 @@ prepareDataPointAsker dpAskers connId = do prepareMetricsStores :: Show addr - => AcceptedMetrics + => ConnectedNodes + -> AcceptedMetrics -> ConnectionId addr -> IO (EKG.Store, TVar MetricsLocalStore) -prepareMetricsStores acceptedMetrics connId = do +prepareMetricsStores connectedNodes acceptedMetrics connId = do + addNode connectedNodes connId let nodeId = connIdToNodeId connId prepareAcceptedMetricsForNewNode nodeId metrics <- readTVarIO acceptedMetrics @@ -49,3 +57,34 @@ prepareMetricsStores acceptedMetrics connId = do (,) <$> EKG.newStore <*> newTVarIO emptyMetricsLocalStore atomically $ modifyTVar' acceptedMetrics $ M.insert nodeId storesForNewNode + +-- | This handler is called when 'runPeer' function throws an exception. +handlePeerError + :: Show addr + => ConnectedNodes + -> ConnectionId addr + -> IO () +handlePeerError = + removeNode -- There is a problem with network connection, so remove this node. + +addNode + :: Show addr + => ConnectedNodes + -> ConnectionId addr + -> IO () +addNode connectedNodes connId = atomically $ do + let nodeId = connIdToNodeId connId + nodes <- readTVar connectedNodes + when (nodeId `S.notMember` nodes) $ + modifyTVar' connectedNodes $ S.insert nodeId + +removeNode + :: Show addr + => ConnectedNodes + -> ConnectionId addr + -> IO () +removeNode connectedNodes connId = atomically $ do + let nodeId = connIdToNodeId connId + nodes <- readTVar connectedNodes + when (nodeId `S.member` nodes) $ + modifyTVar' connectedNodes $ S.delete nodeId diff --git a/cardano-tracer/src/Cardano/Tracer/CLI.hs b/cardano-tracer/src/Cardano/Tracer/CLI.hs index 222c6dc24b9..81ae0d33e37 100644 --- a/cardano-tracer/src/Cardano/Tracer/CLI.hs +++ b/cardano-tracer/src/Cardano/Tracer/CLI.hs @@ -5,7 +5,7 @@ module Cardano.Tracer.CLI import Options.Applicative --- | Type for CLI parameters required for the 'cardano-tracer'. +-- | CLI parameters required for the 'cardano-tracer'. newtype TracerParams = TracerParams { tracerConfig :: FilePath } diff --git a/cardano-tracer/src/Cardano/Tracer/Configuration.hs b/cardano-tracer/src/Cardano/Tracer/Configuration.hs index f77ed8ab9c2..dcfff6affee 100644 --- a/cardano-tracer/src/Cardano/Tracer/Configuration.hs +++ b/cardano-tracer/src/Cardano/Tracer/Configuration.hs @@ -32,8 +32,10 @@ newtype Address = LocalSocket FilePath deriving (Eq, Generic, FromJSON, Show, ToJSON) -- | Endpoint for internal services. -data Endpoint = Endpoint !Host !Port - deriving (Eq, Generic, FromJSON, Show, ToJSON) +data Endpoint = Endpoint + { epHost :: !Host + , epPort :: !Port + } deriving (Eq, Generic, FromJSON, Show, ToJSON) data RotationParams = RotationParams { rpLogLimitBytes :: !Word64 -- ^ Max size of file in bytes @@ -55,7 +57,7 @@ data LogFormat -- | Logging parameters. data LoggingParams = LoggingParams - { logRoot :: !FilePath -- ^ Root directory where all subdirs with logs will be created. + { logRoot :: !FilePath -- ^ Root directory where all subdirs with logs are created. , logMode :: !LogMode -- ^ Log mode. , logFormat :: !LogFormat -- ^ Log format. } deriving (Eq, Generic, FromJSON, Show, ToJSON) @@ -67,16 +69,16 @@ data Network -- | Complete configuration. data TracerConfig = TracerConfig - { network :: !Network -- ^ How cardano-tracer will be connected to node(s). - , loRequestNum :: !(Maybe Word16) -- ^ How many 'TraceObject's will be asked in each request. - , ekgRequestFreq :: !(Maybe Pico) -- ^ How often to request for EKG-metrics, in seconds. - , hasEKG :: !(Maybe Endpoint) -- ^ Endpoint for EKG web-page. - , hasPrometheus :: !(Maybe Endpoint) -- ^ Endpoint for Promeheus web-page. - , logging :: !(NonEmpty LoggingParams) -- ^ Logging parameters. - , rotation :: !(Maybe RotationParams) -- ^ Rotation parameters. + { network :: !Network -- ^ How cardano-tracer will be connected to node(s). + , loRequestNum :: !(Maybe Word16) -- ^ How many 'TraceObject's will be asked in each request. + , ekgRequestFreq :: !(Maybe Pico) -- ^ How often to request for EKG-metrics, in seconds. + , hasEKG :: !(Maybe (Endpoint, Endpoint)) -- ^ Endpoint for EKG web-page (list of nodes, monitoring). + , hasPrometheus :: !(Maybe Endpoint) -- ^ Endpoint for Promeheus web-page. + , logging :: !(NonEmpty LoggingParams) -- ^ Logging parameters. + , rotation :: !(Maybe RotationParams) -- ^ Rotation parameters. } deriving (Eq, Generic, FromJSON, Show, ToJSON) --- | Read the tracer's configuration file (path is passed via '--config' CLI option). +-- | Read the tracer's configuration file. readTracerConfig :: FilePath -> IO TracerConfig readTracerConfig pathToConfig = eitherDecodeFileStrict' pathToConfig >>= \case diff --git a/cardano-tracer/src/Cardano/Tracer/Handlers/Logs/File.hs b/cardano-tracer/src/Cardano/Tracer/Handlers/Logs/File.hs index d20c372ffee..512bf83a0f3 100644 --- a/cardano-tracer/src/Cardano/Tracer/Handlers/Logs/File.hs +++ b/cardano-tracer/src/Cardano/Tracer/Handlers/Logs/File.hs @@ -14,7 +14,6 @@ import qualified Data.ByteString.Lazy as LBS import Data.Char (isDigit) import Data.Maybe (mapMaybe) import qualified Data.Text as T -import Data.Text (Text) import qualified Data.Text.Lazy as TL import Data.Text.Lazy.Encoding (encodeUtf8) import Data.Time.Clock (UTCTime) @@ -30,15 +29,14 @@ import Cardano.Tracer.Types writeTraceObjectsToFile :: NodeId - -> Text -> FilePath -> LogFormat -> [TraceObject] -> IO () -writeTraceObjectsToFile _ _ _ _ [] = return () -writeTraceObjectsToFile nodeId nodeName rootDir format traceObjects = do - pathToCurrentLog <- prepareLogsStructure nodeId nodeName rootDir format - unless (null itemsToWrite) $ +writeTraceObjectsToFile _ _ _ [] = return () +writeTraceObjectsToFile nodeId rootDir format traceObjects = + unless (null itemsToWrite) $ do + pathToCurrentLog <- prepareLogsStructure nodeId rootDir format LBS.appendFile pathToCurrentLog . encodeUtf8 . TL.concat $ itemsToWrite where itemsToWrite = @@ -48,11 +46,10 @@ writeTraceObjectsToFile nodeId nodeName rootDir format traceObjects = do prepareLogsStructure :: NodeId - -> Text -> FilePath -> LogFormat -> IO FilePath -prepareLogsStructure nodeId nodeName rootDir format = do +prepareLogsStructure (NodeId anId) rootDir format = do -- Root directory (as a parent for subDirForLogs) will be created as well if needed. createDirectoryIfMissing True subDirForLogs ifM (doesFileExist pathToCurrentLog) @@ -62,8 +59,7 @@ prepareLogsStructure nodeId nodeName rootDir format = do (createLogAndSymLink subDirForLogs format) return pathToCurrentLog where - subDirForLogs = rootDir nodeFullId - nodeFullId = T.unpack $ printNodeFullId nodeName nodeId + subDirForLogs = rootDir T.unpack anId -- This is a symlink to the current log file, please see rotation parameters. pathToCurrentLog = subDirForLogs symLinkName format diff --git a/cardano-tracer/src/Cardano/Tracer/Handlers/Logs/Journal.hs b/cardano-tracer/src/Cardano/Tracer/Handlers/Logs/Journal.hs index 968fd07ea5c..f68e86b5c9b 100644 --- a/cardano-tracer/src/Cardano/Tracer/Handlers/Logs/Journal.hs +++ b/cardano-tracer/src/Cardano/Tracer/Handlers/Logs/Journal.hs @@ -36,11 +36,10 @@ import Cardano.Tracer.Types #if defined(LINUX) writeTraceObjectsToJournal :: NodeId - -> Text -> [TraceObject] -> IO () -writeTraceObjectsToJournal _ _ [] = return () -writeTraceObjectsToJournal nodeId nodeName traceObjects = +writeTraceObjectsToJournal _ [] = return () +writeTraceObjectsToJournal nodeId traceObjects = mapM_ (sendJournalFields . mkJournalFields) traceObjects where mkJournalFields trOb = @@ -51,7 +50,7 @@ writeTraceObjectsToJournal nodeId nodeName traceObjects = | otherwise -> HM.empty -- Both messages are empty! mkJournalFields' trOb msg = - syslogIdentifier (nodeName <> T.pack (show nodeId)) + syslogIdentifier (T.pack (show nodeId)) <> message msg <> priority (mkPriority $ L.toSeverity trOb) <> HM.fromList diff --git a/cardano-tracer/src/Cardano/Tracer/Handlers/Logs/TraceObjects.hs b/cardano-tracer/src/Cardano/Tracer/Handlers/Logs/TraceObjects.hs index 90977851ee3..d4bf7358608 100644 --- a/cardano-tracer/src/Cardano/Tracer/Handlers/Logs/TraceObjects.hs +++ b/cardano-tracer/src/Cardano/Tracer/Handlers/Logs/TraceObjects.hs @@ -5,8 +5,6 @@ module Cardano.Tracer.Handlers.Logs.TraceObjects ) where import Control.Concurrent.Async (forConcurrently_) -import Control.Concurrent.STM.TVar (readTVarIO) -import Data.Map.Strict ((!?)) import qualified Data.List.NonEmpty as NE import Cardano.Logging (TraceObject) @@ -20,19 +18,13 @@ import Cardano.Tracer.Utils traceObjectsHandler :: TracerConfig -> NodeId - -> AcceptedNodeInfo -> [TraceObject] -> IO () -traceObjectsHandler _ _ _ [] = return () -traceObjectsHandler TracerConfig{logging} nodeId acceptedNodeInfo traceObjects = do - nodesInfo <- readTVarIO acceptedNodeInfo - case nodesInfo !? nodeId of - Nothing -> return () - Just ni -> do - let NodeInfo{niName} = ni - forConcurrently_ (NE.nub logging) $ \LoggingParams{logMode, logRoot, logFormat} -> - case logMode of - FileMode -> - showProblemIfAny $ writeTraceObjectsToFile nodeId niName logRoot logFormat traceObjects - JournalMode -> - showProblemIfAny $ writeTraceObjectsToJournal nodeId niName traceObjects +traceObjectsHandler _ _ [] = return () +traceObjectsHandler TracerConfig{logging} nodeId traceObjects = + forConcurrently_ (NE.nub logging) $ \LoggingParams{logMode, logRoot, logFormat} -> + case logMode of + FileMode -> + showProblemIfAny $ writeTraceObjectsToFile nodeId logRoot logFormat traceObjects + JournalMode -> + showProblemIfAny $ writeTraceObjectsToJournal nodeId traceObjects diff --git a/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Monitoring.hs b/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Monitoring.hs index 03bf227d53e..555d489134a 100644 --- a/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Monitoring.hs +++ b/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Monitoring.hs @@ -1,45 +1,83 @@ {-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE PackageImports #-} {-# LANGUAGE ScopedTypeVariables #-} module Cardano.Tracer.Handlers.Metrics.Monitoring ( runMonitoringServer ) where -import Control.Concurrent (threadDelay) +import Control.Concurrent (ThreadId, killThread) +import Control.Concurrent.STM (atomically) import Control.Concurrent.STM.TVar (readTVarIO) -import Control.Exception (SomeException, try) -import Control.Monad (forever, unless) -import "contra-tracer" Control.Tracer (showTracing, stdoutTracer, traceWith) +import Control.Concurrent.STM.TMVar +import Control.Monad (forM, unless, void) +import Control.Monad.Extra (whenJust, whenJustM) import qualified Data.ByteString.Char8 as BSC import qualified Data.Map.Strict as M -import System.Remote.Monitoring (forkServerWith) +import qualified Data.Set as S +import qualified Data.Text as T +import qualified Graphics.UI.Threepenny as UI +import Graphics.UI.Threepenny.Core (UI, Element, liftIO, set, (#), (#+)) +import System.Remote.Monitoring (forkServerWith, serverThreadId) import Cardano.Tracer.Configuration -import Cardano.Tracer.Types (AcceptedMetrics) +import Cardano.Tracer.Types runMonitoringServer - :: Endpoint + :: (Endpoint, Endpoint) + -> ConnectedNodes -> AcceptedMetrics -> IO () -runMonitoringServer (Endpoint host port) acceptedMetrics = forever $ do - try serveEKGPage >>= \case - Left (e :: SomeException) -> - logTrace $ "cardano-tracer, problem with EKG web server: " <> show e - Right _ -> - return () - threadDelay 1000000 +runMonitoringServer (Endpoint listHost listPort, monitorEP) connectedNodes acceptedMetrics = + UI.startGUI config $ \window -> do + void $ return window # set UI.title "EKG Monitoring Nodes" + void $ mkPageBody connectedNodes monitorEP acceptedMetrics where - serveEKGPage = do - metrics <- readTVarIO acceptedMetrics - unless (M.null metrics) $ do - -- TODO: temporary solution for testing - -- (serve the metrics received from the first found node only). - let (storeForFirstNode, _) = snd . head . M.toList $ metrics - _server <- forkServerWith storeForFirstNode (BSC.pack host) port - waitForever + config = UI.defaultConfig + { UI.jsPort = Just listPort + , UI.jsAddr = Just $ BSC.pack listHost + } - waitForever = forever $ threadDelay 1000000000 +type CurrentEKGServer = TMVar (NodeId, ThreadId) - logTrace = traceWith $ showTracing stdoutTracer +mkPageBody + :: ConnectedNodes + -> Endpoint + -> AcceptedMetrics + -> UI Element +mkPageBody connectedNodes (Endpoint monitorHost monitorPort) acceptedMetrics = do + nodes <- liftIO $ S.toList <$> readTVarIO connectedNodes + if null nodes + then UI.string "There are no connected nodes yet" + else do + currentServer :: CurrentEKGServer <- liftIO newEmptyTMVarIO + nodesLinks <- + forM nodes $ \nodeId@(NodeId anId) -> do + nodeLink <- + UI.li #+ + [ UI.anchor # set UI.href ("http://" <> monitorHost <> ":" <> show monitorPort) + # set UI.target "_blank" + # set UI.title__ "Open EKG monitor page for this node" + # set UI.text (T.unpack anId) + ] + void $ UI.on UI.click nodeLink $ const $ + restartEKGServer nodeId acceptedMetrics monitorHost monitorPort currentServer + return $ UI.element nodeLink + UI.ul #+ nodesLinks + +restartEKGServer + :: NodeId + -> AcceptedMetrics + -> Host + -> Port + -> CurrentEKGServer + -> UI () +restartEKGServer newNodeId acceptedMetrics monitorHost monitorPort currentServer = liftIO $ do + metrics <- readTVarIO acceptedMetrics + whenJust (metrics M.!? newNodeId) $ \(storeForThisNode, _) -> + whenJustM (atomically $ tryTakeTMVar currentServer) $ \(curNodeId, sThread) -> + -- Only if the user clicked to the same node id in the list, we shouldn't restart EKG server. + unless (newNodeId == curNodeId) $ do + killThread sThread + ekgServer <- forkServerWith storeForThisNode (BSC.pack monitorHost) monitorPort + atomically $ putTMVar currentServer (newNodeId, serverThreadId ekgServer) diff --git a/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Prometheus.hs b/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Prometheus.hs index 4f26e3666f4..d844a8b0e25 100644 --- a/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Prometheus.hs +++ b/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Prometheus.hs @@ -13,9 +13,9 @@ import Control.Monad (forM, forever) import Control.Monad.IO.Class (liftIO) import qualified Data.ByteString as BS import qualified Data.ByteString.Char8 as BSC -import Data.List (find) import qualified Data.HashMap.Strict as HM import qualified Data.Map.Strict as M +import qualified Data.Set as S import Data.String (IsString (..)) import Data.Text (Text) import qualified Data.Text as T @@ -25,6 +25,7 @@ import Snap.Core (Snap, getRequest, route, rqParams, writeText) import Snap.Http.Server (Config, ConfigLog (..), defaultConfig, setAccessLog, setBind, setErrorLog, setPort, simpleHttpServe) import System.Metrics (Sample, Value (..), sampleAll) +import System.Time.Extra (sleep) import Text.Blaze.Html import Text.Blaze.Html5 hiding (map) import Text.Blaze.Html5.Attributes hiding (title) @@ -34,17 +35,18 @@ import Cardano.Tracer.Types runPrometheusServer :: Endpoint + -> ConnectedNodes -> AcceptedMetrics - -> AcceptedNodeInfo -> IO () -runPrometheusServer (Endpoint host port) acceptedMetrics acceptedNodeInfo = forever $ +runPrometheusServer (Endpoint host port) connectedNodes acceptedMetrics = forever $ do -- If everything is okay, the function 'simpleHttpServe' never returns. -- But if there is some problem, it never throws an exception, but just stops. -- So if it stopped - it will be re-started. simpleHttpServe config $ - route [ ("metrics", renderListOfNodes) - , ("metrics/:nodefullid", renderMetricsFromNode) + route [ ("metrics", renderListOfConnectedNodes) + , ("metrics/:nodeid", renderMetricsFromNode) ] + sleep 1.0 where config :: Config Snap () config = @@ -54,35 +56,33 @@ runPrometheusServer (Endpoint host port) acceptedMetrics acceptedNodeInfo = fore . setErrorLog ConfigNoLog $ defaultConfig - renderListOfNodes :: Snap () - renderListOfNodes = - M.toList <$> liftIO (readTVarIO acceptedNodeInfo) >>= \case - [] -> writeText "There are no connected nodes yet." - ni -> blaze =<< liftIO (mkListOfHrefs ni) + renderListOfConnectedNodes :: Snap () + renderListOfConnectedNodes = + S.toList <$> liftIO (readTVarIO connectedNodes) >>= \case + [] -> writeText "There are no connected nodes yet." + nIds -> blaze =<< liftIO (mkListOfHrefs nIds) - mkListOfHrefs :: [(NodeId, NodeInfo)] -> IO Html - mkListOfHrefs ni = do - nodeHrefs <- forM ni $ \(nodeId, NodeInfo{niName}) -> do - let nodeFullId = T.unpack $ printNodeFullId niName nodeId - return $ a ! href (mkURL nodeFullId) $ toHtml nodeFullId + mkListOfHrefs :: [NodeId] -> IO Html + mkListOfHrefs nIds = do + nodeHrefs <- forM nIds $ \(NodeId anId) -> do + let anId' = T.unpack anId + return $ a ! href (mkURL anId') $ toHtml anId' return $ mkPage nodeHrefs mkURL :: String -> AttributeValue - mkURL nodeFullId = fromString $ - "http://" <> host <> ":" <> show port <> "/metrics/" <> nodeFullId + mkURL anId' = fromString $ + "http://" <> host <> ":" <> show port <> "/metrics/" <> anId' mkPage :: [Html] -> Html mkPage hrefs = html $ do head $ title "Prometheus metrics" - body $ ol $ mapM_ li hrefs + body $ ul $ mapM_ li hrefs renderMetricsFromNode :: Snap () renderMetricsFromNode = - getRequest >>= return . M.lookup "nodefullid" . rqParams >>= \case - Nothing -> - writeText "No such a node!" - Just nodeFullId -> - writeText =<< liftIO (getMetricsFromNode nodeFullId acceptedMetrics) + getRequest >>= return . M.lookup "nodeid" . rqParams >>= \case + Nothing -> writeText "No such a node!" + Just anId -> writeText =<< liftIO (getMetricsFromNode anId acceptedMetrics) type MetricName = Text type MetricValue = Text @@ -93,20 +93,15 @@ getMetricsFromNode -> AcceptedMetrics -> IO Text getMetricsFromNode [] _ = return "No such a node!" -getMetricsFromNode (nodeFullId':_) acceptedMetrics = do +getMetricsFromNode (anId':_) acceptedMetrics = do metrics <- readTVarIO acceptedMetrics - if M.null metrics - then return "No such a node!" - else do - case find nodeIdWeNeed $ M.keys metrics of - Nothing -> return "No such a node!" - Just nodeId -> do - let (ekgStore, _) = metrics M.! nodeId - sampleAll ekgStore >>= return . renderListOfMetrics . getListOfMetrics + case metrics M.!? nodeId of + Nothing -> + return "No such a node!" + Just (ekgStore, _) -> + sampleAll ekgStore >>= return . renderListOfMetrics . getListOfMetrics where - -- For example, "run-user-1000-core.sock" is suffix of "core-1--run-user-1000-core.sock" - nodeIdWeNeed nodeId = T.pack (show nodeId) `T.isSuffixOf` nodeFullId - nodeFullId = decodeUtf8 nodeFullId' + nodeId = NodeId $ decodeUtf8 anId' getListOfMetrics :: Sample -> MetricsList getListOfMetrics = filter (not . T.null . fst) . map metricsWeNeed . HM.toList @@ -116,7 +111,7 @@ getMetricsFromNode (nodeFullId':_) acceptedMetrics = do Counter c -> (mName, T.pack $ show c) Gauge g -> (mName, T.pack $ show g) Label l -> (mName, l) - _ -> ("", "") -- ekg-forward doesn't support 'Distribution' yet. + _ -> ("", "") -- 'ekg-forward' doesn't support 'Distribution' yet. renderListOfMetrics :: MetricsList -> Text renderListOfMetrics [] = "No metrics were received from this node." diff --git a/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Servers.hs b/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Servers.hs index 1ebdcc98f04..8c546ce6b87 100644 --- a/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Servers.hs +++ b/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Servers.hs @@ -4,26 +4,25 @@ module Cardano.Tracer.Handlers.Metrics.Servers ( runMetricsServers ) where +import Control.Concurrent.Async.Extra (sequenceConcurrently) +import Control.Monad (void) + import Cardano.Tracer.Configuration (TracerConfig (..)) import Cardano.Tracer.Handlers.Metrics.Monitoring (runMonitoringServer) import Cardano.Tracer.Handlers.Metrics.Prometheus (runPrometheusServer) import Cardano.Tracer.Types -import Cardano.Tracer.Utils runMetricsServers :: TracerConfig + -> ConnectedNodes -> AcceptedMetrics - -> AcceptedNodeInfo -> IO () -runMetricsServers TracerConfig{hasEKG, hasPrometheus} acceptedMetrics acceptedNodeInfo = +runMetricsServers TracerConfig{hasEKG, hasPrometheus} connectedNodes acceptedMetrics = case (hasEKG, hasPrometheus) of - (Nothing, Nothing) -> - return () - (Nothing, Just prom) -> - runPrometheusServer prom acceptedMetrics acceptedNodeInfo - (Just ekg, Nothing) -> - runMonitoringServer ekg acceptedMetrics - (Just ekg, Just prom) -> - concurrently2 - (runPrometheusServer prom acceptedMetrics acceptedNodeInfo) - (runMonitoringServer ekg acceptedMetrics) + (Nothing, Nothing) -> return () + (Nothing, Just prom) -> runPrometheusServer prom connectedNodes acceptedMetrics + (Just ekg, Nothing) -> runMonitoringServer ekg connectedNodes acceptedMetrics + (Just ekg, Just prom) -> void . sequenceConcurrently $ + [ runPrometheusServer prom connectedNodes acceptedMetrics + , runMonitoringServer ekg connectedNodes acceptedMetrics + ] diff --git a/cardano-tracer/src/Cardano/Tracer/Run.hs b/cardano-tracer/src/Cardano/Tracer/Run.hs index 2548fdbf94c..8463b6b4422 100644 --- a/cardano-tracer/src/Cardano/Tracer/Run.hs +++ b/cardano-tracer/src/Cardano/Tracer/Run.hs @@ -3,14 +3,13 @@ -- | This top-level module is used by 'cardano-tracer' app. module Cardano.Tracer.Run ( runCardanoTracer - -- | For testing purposes. - , runCardanoTracerWithConfig - , runCardanoTracerWithConfigBrake + , doRunCardanoTracer ) where -import Control.Concurrent.STM.TVar (TVar) +import Control.Concurrent.Async.Extra (sequenceConcurrently) +import Control.Monad (void) -import Cardano.Tracer.Acceptors.Run (runAcceptors, runAcceptorsWithBrake) +import Cardano.Tracer.Acceptors.Run (runAcceptors) import Cardano.Tracer.CLI (TracerParams (..)) import Cardano.Tracer.Configuration (TracerConfig, readTracerConfig) import Cardano.Tracer.Handlers.Logs.Rotator (runLogsRotator) @@ -19,30 +18,20 @@ import Cardano.Tracer.Types import Cardano.Tracer.Utils runCardanoTracer :: TracerParams -> IO () -runCardanoTracer TracerParams{tracerConfig} = - readTracerConfig tracerConfig >>= runCardanoTracerWithConfig +runCardanoTracer TracerParams{tracerConfig} = do + config <- readTracerConfig tracerConfig + doRunCardanoTracer config =<< initProtocolsBrake -runCardanoTracerWithConfig +doRunCardanoTracer :: TracerConfig + -> ProtocolsBrake -> IO () -runCardanoTracerWithConfig config = do +doRunCardanoTracer config protocolsBrake = do + connectedNodes <- initConnectedNodes acceptedMetrics <- initAcceptedMetrics - acceptedNodeInfo <- initAcceptedNodeInfo dpAskers <- initDataPointAskers - concurrently3 - (runLogsRotator config) - (runMetricsServers config acceptedMetrics acceptedNodeInfo) - (runAcceptors config acceptedMetrics acceptedNodeInfo dpAskers) - -runCardanoTracerWithConfigBrake - :: TracerConfig - -> TVar Bool - -> IO () -runCardanoTracerWithConfigBrake config protocolsBrake = do - acceptedMetrics <- initAcceptedMetrics - acceptedNodeInfo <- initAcceptedNodeInfo - dpAskers <- initDataPointAskers - concurrently3 - (runLogsRotator config) - (runMetricsServers config acceptedMetrics acceptedNodeInfo) - (runAcceptorsWithBrake config acceptedMetrics acceptedNodeInfo dpAskers protocolsBrake) + void . sequenceConcurrently $ + [ runLogsRotator config + , runMetricsServers config connectedNodes acceptedMetrics + , runAcceptors config connectedNodes acceptedMetrics dpAskers protocolsBrake + ] diff --git a/cardano-tracer/src/Cardano/Tracer/Types.hs b/cardano-tracer/src/Cardano/Tracer/Types.hs index 5699ff56440..d315396e489 100644 --- a/cardano-tracer/src/Cardano/Tracer/Types.hs +++ b/cardano-tracer/src/Cardano/Tracer/Types.hs @@ -1,34 +1,20 @@ -{-# LANGUAGE NamedFieldPuns #-} -{-# LANGUAGE OverloadedStrings #-} - module Cardano.Tracer.Types ( AcceptedMetrics - , AcceptedNodeInfo + , ConnectedNodes , DataPointAskers - , Metrics , NodeId (..) - , NodeInfo (..) - , connIdToNodeId - , initAcceptedMetrics - , initAcceptedNodeInfo - , initDataPointAskers - , printNodeFullId + , ProtocolsBrake ) where -import Control.Concurrent.STM.TVar (TVar, newTVarIO) +import Control.Concurrent.STM.TVar (TVar) import Data.Map.Strict (Map) -import qualified Data.Map.Strict as M +import Data.Set (Set) import Data.Text (Text) -import qualified Data.Text as T -import Data.Time.Clock (UTCTime) import qualified System.Metrics as EKG - -import Ouroboros.Network.Socket (ConnectionId (..)) +import System.Metrics.Store.Acceptor (MetricsLocalStore) import Trace.Forward.Utils.DataPoint (DataPointAsker) -import System.Metrics.Store.Acceptor (MetricsLocalStore) - -- | Unique identifier of the node, based on 'remoteAddress' from 'ConnectionId', -- please see 'ouroboros-network'. newtype NodeId = NodeId Text @@ -36,56 +22,18 @@ newtype NodeId = NodeId Text -- | We have to create EKG.Store and MetricsLocalStore -- to keep all the metrics accepted from the node. -type Metrics = (EKG.Store, TVar MetricsLocalStore) - -type AcceptedMetrics = TVar (Map NodeId Metrics) +type AcceptedMetrics = TVar (Map NodeId (EKG.Store, TVar MetricsLocalStore)) --- | We have to store 'DataPointAsker's to be able to ask particular node for some 'DataPoint's. +-- | We have to store 'DataPointAsker's to be able +-- to ask particular node for some 'DataPoint's. type DataPointAskers = TVar (Map NodeId DataPointAsker) --- | TMP! -data NodeInfo = NodeInfo - { niName :: !Text - , niProtocol :: !Text - , niVersion :: !Text - , niCommit :: !Text - , niStartTime :: !UTCTime - , niSystemStartTime :: !UTCTime - } - --- | 'NodeInfo' type is provided by the node (as one of 'DataPoint's) --- and contains important information about the node. --- --- Please note that 'NodeInfo' should be asked ASAP after the node is connected, --- because all the parts of 'cardano-tracer' need it. --- --- If the node is disconnected, corresponding 'NodeInfo' will be deleted, --- so 'AcceptedNodeInfo' is used as a "source of truth" about currently +-- | This set contains ids of currently connected nodes. +-- When the node is connected, its id will be added to this set, +-- and when it is disconnected, it's id will be deleted from this set. +-- So, 'ConnectedNodes' is used as a "source of truth" about currently -- connected nodes. -type AcceptedNodeInfo = TVar (Map NodeId NodeInfo) - -connIdToNodeId :: Show addr => ConnectionId addr -> NodeId -connIdToNodeId ConnectionId{remoteAddress} = NodeId preparedAddress - where - -- We have to remove "wrong" symbols from 'NodeId', - -- to make it appropriate for the name of the subdirectory. - preparedAddress = - T.replace "LocalAddress" "" -- There are only local addresses by design. - . T.replace " " "-" - . T.replace "\"" "" - . T.replace "/" "-" - . T.pack - $ show remoteAddress - -printNodeFullId :: Text -> NodeId -> Text -printNodeFullId "" (NodeId p) = T.drop 2 p -- In this case, '--' in the beginning is useless. -printNodeFullId nodeName (NodeId p) = nodeName <> p - -initAcceptedMetrics :: IO AcceptedMetrics -initAcceptedMetrics = newTVarIO M.empty - -initAcceptedNodeInfo :: IO AcceptedNodeInfo -initAcceptedNodeInfo = newTVarIO M.empty +type ConnectedNodes = TVar (Set NodeId) -initDataPointAskers :: IO DataPointAskers -initDataPointAskers = newTVarIO M.empty +-- | The flag we use to stop the protocols from their acceptor's side. +type ProtocolsBrake = TVar Bool diff --git a/cardano-tracer/src/Cardano/Tracer/Utils.hs b/cardano-tracer/src/Cardano/Tracer/Utils.hs index b0d2d94ad6f..90a20672c7d 100644 --- a/cardano-tracer/src/Cardano/Tracer/Utils.hs +++ b/cardano-tracer/src/Cardano/Tracer/Utils.hs @@ -1,34 +1,33 @@ {-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PackageImports #-} {-# LANGUAGE ScopedTypeVariables #-} --- | This top-level module is used by 'cardano-tracer' app. module Cardano.Tracer.Utils - ( concurrently2 - , concurrently3 - , runInLoop + ( runInLoop , showProblemIfAny + , connIdToNodeId + , initConnectedNodes + , initAcceptedMetrics + , initDataPointAskers + , initProtocolsBrake + , applyBrake ) where -import Control.Concurrent.Async (concurrently_, withAsync, wait) +import Control.Concurrent.STM (atomically) +import Control.Concurrent.STM.TVar (modifyTVar', newTVarIO) import Control.Exception (IOException, SomeAsyncException (..), fromException, try, tryJust) -import Control.Monad (void) import "contra-tracer" Control.Tracer (showTracing, stdoutTracer, traceWith) +import qualified Data.Map.Strict as M +import qualified Data.Set as S +import qualified Data.Text as T import System.Time.Extra (sleep) -concurrently2 :: IO () -> IO () -> IO () -concurrently2 = concurrently_ +import Ouroboros.Network.Socket (ConnectionId (..)) -concurrently3 :: IO () -> IO () -> IO () -> IO () -concurrently3 action1 action2 action3 = - withAsync action1 $ \a1 -> - withAsync action2 $ \a2 -> - withAsync action3 $ \a3 -> do - void $ wait a1 - void $ wait a2 - void $ wait a3 +import Cardano.Tracer.Types -- | Run monadic action in a loop. If there's an exception, it will re-run -- the action again, after pause that grows. @@ -64,3 +63,31 @@ showProblemIfAny action = Right _ -> return () where logTrace = traceWith $ showTracing stdoutTracer + +connIdToNodeId :: Show addr => ConnectionId addr -> NodeId +connIdToNodeId ConnectionId{remoteAddress} = NodeId preparedAddress + where + -- We have to remove "wrong" symbols from 'NodeId', + -- to make it appropriate for the name of the subdirectory. + preparedAddress = + T.replace "LocalAddress" "" -- There are only local addresses by design. + . T.replace " " "-" + . T.replace "\"" "" + . T.replace "/" "-" + . T.pack + $ show remoteAddress + +initConnectedNodes :: IO ConnectedNodes +initConnectedNodes = newTVarIO S.empty + +initAcceptedMetrics :: IO AcceptedMetrics +initAcceptedMetrics = newTVarIO M.empty + +initDataPointAskers :: IO DataPointAskers +initDataPointAskers = newTVarIO M.empty + +initProtocolsBrake :: IO ProtocolsBrake +initProtocolsBrake = newTVarIO False + +applyBrake :: ProtocolsBrake -> IO () +applyBrake stopProtocols = atomically $ modifyTVar' stopProtocols . const $ True diff --git a/cardano-tracer/test/Cardano/Tracer/Test/Forwarder.hs b/cardano-tracer/test/Cardano/Tracer/Test/Forwarder.hs index 3007ad84784..cd658ffbaac 100644 --- a/cardano-tracer/test/Cardano/Tracer/Test/Forwarder.hs +++ b/cardano-tracer/test/Cardano/Tracer/Test/Forwarder.hs @@ -47,8 +47,11 @@ import qualified System.Metrics as EKG import Cardano.Logging (DetailLevel (..), SeverityS (..), TraceObject (..)) -import qualified Trace.Forward.Configuration.TraceObject as TF +import qualified Trace.Forward.Configuration.DataPoint as DPF +import qualified Trace.Forward.Configuration.TraceObject as TOF +import Trace.Forward.Run.DataPoint.Forwarder import Trace.Forward.Run.TraceObject.Forwarder +import Trace.Forward.Utils.DataPoint import Trace.Forward.Utils.TraceObject import qualified System.Metrics.Configuration as EKGF @@ -82,13 +85,13 @@ launchForwardersSimple' iomgr mode p connSize disconnSize = do (localSnocket iomgr) (localAddressFromPath p) noTimeLimitsHandshake - (ekgConfig, tfConfig) + (ekgConfig, tfConfig, dpfConfig) Responder -> doListenToAcceptor (localSnocket iomgr) (localAddressFromPath p) noTimeLimitsHandshake - (ekgConfig, tfConfig) + (ekgConfig, tfConfig, dpfConfig) where ekgConfig :: EKGF.ForwarderConfiguration ekgConfig = @@ -99,25 +102,36 @@ launchForwardersSimple' iomgr mode p connSize disconnSize = do , EKGF.actionOnRequest = const $ return () } - tfConfig :: TF.ForwarderConfiguration TraceObject + tfConfig :: TOF.ForwarderConfiguration TraceObject tfConfig = - TF.ForwarderConfiguration - { TF.forwarderTracer = nullTracer - , TF.acceptorEndpoint = p - , TF.disconnectedQueueSize = disconnSize - , TF.connectedQueueSize = connSize + TOF.ForwarderConfiguration + { TOF.forwarderTracer = nullTracer + , TOF.acceptorEndpoint = p + , TOF.disconnectedQueueSize = disconnSize + , TOF.connectedQueueSize = connSize + } + + dpfConfig :: DPF.ForwarderConfiguration + dpfConfig = + DPF.ForwarderConfiguration + { DPF.forwarderTracer = nullTracer + , DPF.acceptorEndpoint = p } doConnectToAcceptor :: Snocket IO fd addr -> addr -> ProtocolTimeLimits (Handshake UnversionedProtocol Term) - -> (EKGF.ForwarderConfiguration, TF.ForwarderConfiguration TraceObject) + -> ( EKGF.ForwarderConfiguration + , TOF.ForwarderConfiguration TraceObject + , DPF.ForwarderConfiguration + ) -> IO () -doConnectToAcceptor snocket address timeLimits (ekgConfig, tfConfig) = do +doConnectToAcceptor snocket address timeLimits (ekgConfig, tfConfig, dpfConfig) = do store <- EKG.newStore EKG.registerGcMetrics store sink <- initForwardSink tfConfig + dpStore <- initDataPointStore withAsync (traceObjectsWriter sink) $ \_ -> do connectToNode snocket @@ -129,8 +143,9 @@ doConnectToAcceptor snocket address timeLimits (ekgConfig, tfConfig) = do (simpleSingletonVersions UnversionedProtocol UnversionedProtocolData - (forwarderApp [ (forwardEKGMetrics ekgConfig store, 1) - , (forwardTraceObjectsInit tfConfig sink, 2) + (forwarderApp [ (forwardEKGMetrics ekgConfig store, 1) + , (forwardTraceObjectsInit tfConfig sink, 2) + , (forwardDataPointsInit dpfConfig dpStore, 3) ] ) ) @@ -155,12 +170,16 @@ doListenToAcceptor => Snocket IO fd addr -> addr -> ProtocolTimeLimits (Handshake UnversionedProtocol Term) - -> (EKGF.ForwarderConfiguration, TF.ForwarderConfiguration TraceObject) + -> ( EKGF.ForwarderConfiguration + , TOF.ForwarderConfiguration TraceObject + , DPF.ForwarderConfiguration + ) -> IO () -doListenToAcceptor snocket address timeLimits (ekgConfig, tfConfig) = do +doListenToAcceptor snocket address timeLimits (ekgConfig, tfConfig, dpfConfig) = do store <- EKG.newStore EKG.registerGcMetrics store sink <- initForwardSink tfConfig + dpStore <- initDataPointStore withAsync (traceObjectsWriter sink) $ \_ -> do networkState <- newNetworkMutableState race_ (cleanNetworkMutableState networkState) @@ -178,14 +197,14 @@ doListenToAcceptor snocket address timeLimits (ekgConfig, tfConfig) = do UnversionedProtocol UnversionedProtocolData (SomeResponderApplication $ - forwarderApp [ (forwardEKGMetricsResp ekgConfig store, 1) - , (forwardTraceObjectsResp tfConfig sink, 2) + forwarderApp [ (forwardEKGMetricsResp ekgConfig store, 1) + , (forwardTraceObjectsResp tfConfig sink, 2) + , (forwardDataPointsResp dpfConfig dpStore, 3) ] ) ) nullErrorPolicies - $ \_ serverAsync -> - wait serverAsync -- Block until async exception. + $ \_ serverAsync -> wait serverAsync -- Block until async exception. where forwarderApp :: [(RunMiniProtocol 'ResponderMode LBS.ByteString IO Void (), Word16)] diff --git a/cardano-tracer/test/Cardano/Tracer/Test/Logs/Tests.hs b/cardano-tracer/test/Cardano/Tracer/Test/Logs/Tests.hs index 356826f3f01..c22cc47e2a8 100644 --- a/cardano-tracer/test/Cardano/Tracer/Test/Logs/Tests.hs +++ b/cardano-tracer/test/Cardano/Tracer/Test/Logs/Tests.hs @@ -8,8 +8,6 @@ module Cardano.Tracer.Test.Logs.Tests ) where import Control.Concurrent.Async (withAsync) -import Control.Concurrent.STM (atomically) -import Control.Concurrent.STM.TVar (modifyTVar', newTVarIO) import Control.Monad (filterM) import qualified Data.List.NonEmpty as NE import Test.Tasty @@ -19,8 +17,9 @@ import System.FilePath import System.Time.Extra import Cardano.Tracer.Configuration -import Cardano.Tracer.Handlers.Logs.Log (isItLog, isItSymLink) -import Cardano.Tracer.Run (runCardanoTracerWithConfigBrake) +import Cardano.Tracer.Handlers.Logs.Utils (isItLog, isItSymLink) +import Cardano.Tracer.Run (doRunCardanoTracer) +import Cardano.Tracer.Utils import Cardano.Tracer.Test.Forwarder import Cardano.Tracer.Test.Utils @@ -33,11 +32,11 @@ tests = localOption (QuickCheckTests 1) $ testGroup "Test.Logs" propLogs :: LogFormat -> FilePath -> FilePath -> IO Property propLogs format rootDir localSock = do - stopProtocols <- newTVarIO False - withAsync (runCardanoTracerWithConfigBrake (config rootDir localSock) stopProtocols) $ \_ -> + stopProtocols <- initProtocolsBrake + withAsync (doRunCardanoTracer (config rootDir localSock) stopProtocols) $ \_ -> withAsync (launchForwardersSimple Responder localSock 1000 10000) $ \_ -> do sleep 15.0 -- Wait till some rotation is done. - atomically $ modifyTVar' stopProtocols . const $ True + applyBrake stopProtocols sleep 1.0 doesDirectoryExist rootDir >>= \case diff --git a/cardano-tracer/test/Cardano/Tracer/Test/Network/Tests.hs b/cardano-tracer/test/Cardano/Tracer/Test/Network/Tests.hs index 572db51be8b..715b407a265 100644 --- a/cardano-tracer/test/Cardano/Tracer/Test/Network/Tests.hs +++ b/cardano-tracer/test/Cardano/Tracer/Test/Network/Tests.hs @@ -13,6 +13,7 @@ import System.Time.Extra (sleep) import Cardano.Tracer.Configuration import Cardano.Tracer.Run +import Cardano.Tracer.Utils import Cardano.Tracer.Test.Forwarder import Cardano.Tracer.Test.Utils @@ -28,18 +29,19 @@ tests = localOption (QuickCheckTests 1) $ testGroup "Test.Network" propNetwork :: SideToRestart -> FilePath -> FilePath -> IO Property propNetwork whichSide rootDir localSock = do + protocolsBrake <- initProtocolsBrake case whichSide of First -> propNetwork' rootDir - ( runCardanoTracerWithConfig (config Initiate rootDir localSock) + ( doRunCardanoTracer (config Initiate rootDir localSock) protocolsBrake , launchForwardersSimple Responder localSock 1000 10000 ) Second -> propNetwork' rootDir ( launchForwardersSimple Initiator localSock 1000 10000 - , runCardanoTracerWithConfig (config Response rootDir localSock) + , doRunCardanoTracer (config Response rootDir localSock) protocolsBrake ) propNetwork' :: FilePath -> (IO (), IO ()) -> IO Property diff --git a/cardano-tracer/test/Cardano/Tracer/Test/Utils.hs b/cardano-tracer/test/Cardano/Tracer/Test/Utils.hs index 1dce885ec9e..32712f21d87 100644 --- a/cardano-tracer/test/Cardano/Tracer/Test/Utils.hs +++ b/cardano-tracer/test/Cardano/Tracer/Test/Utils.hs @@ -45,3 +45,16 @@ removeDirectoryContent dir = listContents dir >>= mapM_ removePathForcibly doesDirectoryEmpty :: FilePath -> IO Bool doesDirectoryEmpty dir = listContents dir >>= return . null + +{- + +data NodeInfo = NodeInfo + { niName :: !Text + , niProtocol :: !Text + , niVersion :: !Text + , niCommit :: !Text + , niStartTime :: !UTCTime + , niSystemStartTime :: !UTCTime + } + +-} diff --git a/trace-forward/src/Trace/Forward/Run/DataPoint/Acceptor.hs b/trace-forward/src/Trace/Forward/Run/DataPoint/Acceptor.hs index afbc8c76121..e889448c2f4 100644 --- a/trace-forward/src/Trace/Forward/Run/DataPoint/Acceptor.hs +++ b/trace-forward/src/Trace/Forward/Run/DataPoint/Acceptor.hs @@ -8,6 +8,7 @@ module Trace.Forward.Run.DataPoint.Acceptor ) where import qualified Codec.Serialise as CBOR +import Control.Exception (finally) import Control.Monad.Extra (ifM) import Control.Monad.STM (atomically, check) import Control.Concurrent.STM.TVar (modifyTVar', readTVar, readTVarIO) @@ -26,22 +27,25 @@ import Trace.Forward.Utils.DataPoint (DataPointAsker (..)) acceptDataPointsInit :: AcceptorConfiguration -> IO DataPointAsker + -> IO () -> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void -acceptDataPointsInit config mkDPAsker = - InitiatorProtocolOnly $ runPeerWithAsker config mkDPAsker +acceptDataPointsInit config mkDPAsker peerErrorHandler = + InitiatorProtocolOnly $ runPeerWithAsker config mkDPAsker peerErrorHandler acceptDataPointsResp :: AcceptorConfiguration -> IO DataPointAsker + -> IO () -> RunMiniProtocol 'ResponderMode LBS.ByteString IO Void () -acceptDataPointsResp config mkDPAsker = - ResponderProtocolOnly $ runPeerWithAsker config mkDPAsker +acceptDataPointsResp config mkDPAsker peerErrorHandler = + ResponderProtocolOnly $ runPeerWithAsker config mkDPAsker peerErrorHandler runPeerWithAsker :: AcceptorConfiguration -> IO DataPointAsker + -> IO () -> MuxPeer LBS.ByteString IO () -runPeerWithAsker config mkDPAsker = +runPeerWithAsker config mkDPAsker peerErrorHandler = MuxPeerRaw $ \channel -> do dpAsker <- mkDPAsker runPeer @@ -50,6 +54,7 @@ runPeerWithAsker config mkDPAsker = CBOR.encode CBOR.decode) channel (Acceptor.dataPointAcceptorPeer $ acceptorActions config dpAsker []) + `finally` peerErrorHandler acceptorActions :: AcceptorConfiguration diff --git a/trace-forward/src/Trace/Forward/Run/TraceObject/Acceptor.hs b/trace-forward/src/Trace/Forward/Run/TraceObject/Acceptor.hs index 89249fb40f7..24a1e5bdb29 100644 --- a/trace-forward/src/Trace/Forward/Run/TraceObject/Acceptor.hs +++ b/trace-forward/src/Trace/Forward/Run/TraceObject/Acceptor.hs @@ -13,7 +13,7 @@ import Control.Concurrent.Async (race) import Control.Monad.Extra (ifM) import Control.Monad.STM (atomically, check) import Control.Concurrent.STM.TVar (TVar, readTVar, readTVarIO, registerDelay) -import Control.Exception (Exception, throwIO) +import Control.Exception (Exception, finally, throwIO) import qualified Data.ByteString.Lazy as LBS import Data.Typeable (Typeable) import Data.Void (Void) @@ -34,9 +34,10 @@ acceptTraceObjectsInit Typeable lo) => AcceptorConfiguration lo -- ^ Acceptor's configuration. -> ([lo] -> IO ()) -- ^ The handler for accepted 'TraceObject's. + -> IO () -- ^ The handler for exceptions from 'runPeer'. -> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void -acceptTraceObjectsInit config loHandler = - InitiatorProtocolOnly $ runPeerWithHandler config loHandler +acceptTraceObjectsInit config loHandler peerErrorHandler = + InitiatorProtocolOnly $ runPeerWithHandler config loHandler peerErrorHandler acceptTraceObjectsResp :: (CBOR.Serialise lo, @@ -44,9 +45,10 @@ acceptTraceObjectsResp Typeable lo) => AcceptorConfiguration lo -- ^ Acceptor's configuration. -> ([lo] -> IO ()) -- ^ The handler for accepted 'TraceObject's. + -> IO () -- ^ The handler for exceptions from 'runPeer'. -> RunMiniProtocol 'ResponderMode LBS.ByteString IO Void () -acceptTraceObjectsResp config loHandler = - ResponderProtocolOnly $ runPeerWithHandler config loHandler +acceptTraceObjectsResp config loHandler peerErrorHandler = + ResponderProtocolOnly $ runPeerWithHandler config loHandler peerErrorHandler runPeerWithHandler :: (CBOR.Serialise lo, @@ -54,8 +56,9 @@ runPeerWithHandler Typeable lo) => AcceptorConfiguration lo -> ([lo] -> IO ()) + -> IO () -> MuxPeer LBS.ByteString IO () -runPeerWithHandler config@AcceptorConfiguration{acceptorTracer, shouldWeStop} loHandler = +runPeerWithHandler config@AcceptorConfiguration{acceptorTracer, shouldWeStop} loHandler peerErrorHandler = MuxPeerRaw $ \channel -> timeoutWhenStopped shouldWeStop @@ -66,6 +69,7 @@ runPeerWithHandler config@AcceptorConfiguration{acceptorTracer, shouldWeStop} lo CBOR.encode CBOR.decode) channel (Acceptor.traceObjectAcceptorPeer $ acceptorActions config loHandler) + `finally` peerErrorHandler acceptorActions :: (CBOR.Serialise lo, @@ -96,7 +100,7 @@ timeoutWhenStopped stopVar delay action = either id id <$> race action ( do atomically (readTVar stopVar >>= check) - v <- registerDelay delay - atomically (readTVar v >>= check) + v <- registerDelay delay + atomically (readTVar v >>= check) throwIO Timeout )