From 047a376fc872deafc288e9232d07fdb1160070ae Mon Sep 17 00:00:00 2001 From: MarcFontaine Date: Fri, 24 Jan 2020 18:16:15 +0100 Subject: [PATCH 01/27] Replace the old ntp-client with a new implementation. --- nix/.stack.nix/default.nix | 1 - nix/.stack.nix/ntp-client.nix | 21 -- ntp-client/ntp-client.cabal | 51 +-- ntp-client/src/Network/NTP/Client.hs | 528 +++++++++++++-------------- ntp-client/src/Network/NTP/Packet.hs | 15 +- ntp-client/src/Network/NTP/Trace.hs | 57 +-- ntp-client/src/Network/NTP/Util.hs | 270 -------------- ntp-client/test/NtpApp.hs | 25 -- ntp-client/test/Test.hs | 32 +- stack.yaml | 1 - 10 files changed, 326 insertions(+), 675 deletions(-) delete mode 100644 ntp-client/src/Network/NTP/Util.hs delete mode 100644 ntp-client/test/NtpApp.hs diff --git a/nix/.stack.nix/default.nix b/nix/.stack.nix/default.nix index d1ec18c1857..aa58f913508 100644 --- a/nix/.stack.nix/default.nix +++ b/nix/.stack.nix/default.nix @@ -11,7 +11,6 @@ "hedgehog-quickcheck" = (((hackage.hedgehog-quickcheck)."0.1.1").revisions).default; "quickcheck-state-machine" = (((hackage.quickcheck-state-machine)."0.6.0").revisions).default; "splitmix" = (((hackage.splitmix)."0.0.2").revisions).default; - "time-units" = (((hackage.time-units)."1.0.0").revisions).default; "tasty-hedgehog" = (((hackage.tasty-hedgehog)."1.0.0.1").revisions).default; "Unique" = (((hackage.Unique)."0.4.7.6").revisions).default; "statistics-linreg" = (((hackage.statistics-linreg)."0.3").revisions).default; diff --git a/nix/.stack.nix/ntp-client.nix b/nix/.stack.nix/ntp-client.nix index e949946ee32..ab0a119c5c4 100644 --- a/nix/.stack.nix/ntp-client.nix +++ b/nix/.stack.nix/ntp-client.nix @@ -22,38 +22,17 @@ (hsPkgs.binary) (hsPkgs.bytestring) (hsPkgs.contra-tracer) - (hsPkgs.formatting) (hsPkgs.network) (hsPkgs.stm) - (hsPkgs.these) (hsPkgs.time) - (hsPkgs.time-units) ]; }; - exes = { - "ntp-app" = { - depends = [ - (hsPkgs.async) - (hsPkgs.base) - (hsPkgs.binary) - (hsPkgs.bytestring) - (hsPkgs.contra-tracer) - (hsPkgs.formatting) - (hsPkgs.network) - (hsPkgs.stm) - (hsPkgs.these) - (hsPkgs.time) - (hsPkgs.time-units) - ]; - }; - }; tests = { "ntp-client-test" = { depends = [ (hsPkgs.base) (hsPkgs.binary) (hsPkgs.time) - (hsPkgs.time-units) (hsPkgs.QuickCheck) (hsPkgs.tasty) (hsPkgs.tasty-quickcheck) diff --git a/ntp-client/ntp-client.cabal b/ntp-client/ntp-client.cabal index 44e62ffe39b..9b40cbf7188 100644 --- a/ntp-client/ntp-client.cabal +++ b/ntp-client/ntp-client.cabal @@ -11,61 +11,32 @@ cabal-version: >=1.20 Library exposed-modules: Network.NTP.Client - Network.NTP.Util Network.NTP.Packet Network.NTP.Trace - build-depends: async - , base - , binary >= 0.8 - , bytestring - , contra-tracer - , formatting - , network - , stm - , these - , time - , time-units + build-depends: async >=2.2 && <2.3 + , base >=4.9 && <4.13 + , binary >=0.8 && <0.9 + , bytestring >=0.10 && <0.11 + , contra-tracer >=0.1 && <0.2 + , network >= 3.1 && <3.2 + , stm >=2.4 && <2.6 + , time >=1.6 && <1.10 hs-source-dirs: src default-language: Haskell2010 - ghc-options: -Wall -Werror -fwarn-redundant-constraints + ghc-options: -Wall default-extensions: DeriveDataTypeable DeriveGeneric GeneralizedNewtypeDeriving - OverloadedStrings - MonadFailDesugaring - --- Just for testing: to be removed later. -Executable ntp-app - hs-source-dirs: test, src - main-is: NtpApp.hs - default-language: Haskell2010 - ghc-options: -Wall -Werror -fwarn-redundant-constraints - other-modules: Network.NTP.Client - Network.NTP.Util - Network.NTP.Packet - Network.NTP.Trace - build-depends: async - , base - , binary >= 0.8 - , bytestring - , contra-tracer - , formatting - , network - , stm - , these - , time - , time-units test-suite ntp-client-test hs-source-dirs: test, src main-is: Test.hs type: exitcode-stdio-1.0 other-modules: Network.NTP.Packet - build-depends: base - , binary >= 0.8 + build-depends: base >=4.9 && <4.13 + , binary , time - , time-units , QuickCheck , tasty , tasty-quickcheck diff --git a/ntp-client/src/Network/NTP/Client.hs b/ntp-client/src/Network/NTP/Client.hs index 4a812377637..e12ecffb4f1 100644 --- a/ntp-client/src/Network/NTP/Client.hs +++ b/ntp-client/src/Network/NTP/Client.hs @@ -1,65 +1,69 @@ -{-# LANGUAGE ConstraintKinds #-} +{-# LANGUAGE DataKinds #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE NamedFieldPuns #-} -{-# LANGUAGE RankNTypes #-} -{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE KindSignatures #-} {-# LANGUAGE ScopedTypeVariables #-} - --- | This module implements functionality of NTP client. +{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NumericUnderscores #-} module Network.NTP.Client - ( NtpClientSettings (..) - , NtpClient (..) - , withNtpClient - ) where +where import Control.Concurrent (threadDelay) -import Control.Concurrent.Async (async, concurrently_, race, race_) -import Control.Concurrent.STM -import Control.Exception (Exception, IOException, bracket, catch, handle) -import Control.Monad (forever, when) +import Control.Concurrent.Async +import Control.Concurrent.STM (STM, atomically, check) +import Control.Concurrent.STM.TVar +import Control.Exception (bracket) +import System.IO.Error (catchIOError, tryIOError, userError, ioError) +import Control.Monad (forever, forM, forM_, when) import Control.Tracer -import Data.Binary (decodeOrFail) +import Data.Binary (decodeOrFail, encode) import qualified Data.ByteString.Lazy as LBS -import Data.List.NonEmpty (NonEmpty) -import qualified Data.List.NonEmpty as NE -import Data.Maybe (catMaybes) -import Data.Semigroup (Last (..)) -import Data.These (These (..)) -import Data.Time.Units (Microsecond) -import Data.Typeable (Typeable) +import Data.List (find) +import Data.Maybe +import Network.Socket ( AddrInfo () + , AddrInfoFlag (AI_ADDRCONFIG, AI_PASSIVE) + , Family (AF_INET, AF_INET6) + , PortNumber + , Socket + , SockAddr (..) + , SocketOption (ReuseAddr) + , SocketType (Datagram) + , addrAddress + , addrFamily + , addrFlags + , addrSocketType) import qualified Network.Socket as Socket -import Network.Socket.ByteString (recvFrom) - -import Network.NTP.Packet (NtpOffset (..) , NtpPacket (..), clockOffset, - mkNtpPacket, ntpPacketSize) -import Network.NTP.Trace (NtpTrace (..)) -import Network.NTP.Util (AddrFamily (..), Addresses, Sockets, - WithAddrFamily (..), createAndBindSock, - resolveNtpHost, - runWithAddrFamily, sendPacket, udpLocalAddresses) +import qualified Network.Socket.ByteString as Socket.ByteString (recvFrom, sendManyTo) +import Network.NTP.Packet ( NtpPacket + , mkNtpPacket + , ntpPacketSize + , Microsecond + , NtpOffset (..) + , getCurrentTime + , clockOffsetPure) +import Network.NTP.Trace (NtpTrace (..), IPVersion (..)) + +main :: IO () +main = testClient data NtpClientSettings = NtpClientSettings { ntpServers :: [String] - -- ^ list of servers addresses + -- ^ List of servers addresses. , ntpResponseTimeout :: Microsecond - -- ^ delay between making requests and response collection + -- ^ Timeout between sending NTP requests and response collection. , ntpPollDelay :: Microsecond - -- ^ how long to wait between to send requests to the servers - , ntpSelection :: NonEmpty NtpOffset -> NtpOffset - -- ^ way to sumarize results received from different servers. - -- this may accept list of lesser size than @length ntpServers@ in case - -- some servers failed to respond in time, but never an empty list + -- ^ How long to wait between two rounds of requests. + , ntpReportPolicy :: [ReceivedPacket] -> Maybe NtpOffset } data NtpClient = NtpClient { -- | Query the current NTP status. ntpGetStatus :: STM NtpStatus - -- | Bypass all internal threadDelays and trigger a new NTP query. , ntpTriggerUpdate :: IO () + , ntpThread :: Async () } data NtpStatus = @@ -71,248 +75,220 @@ data NtpStatus = -- `ntpResponseTimeout` or NTP was not configured. | NtpSyncUnavailable deriving (Eq, Show) --- | Setup a NtpClient and run a computation that uses that client. --- Todo : proper bracket-style tear-down of the NTP client. +data ReceivedPacket = ReceivedPacket + { receivedPacket :: !NtpPacket + , receivedLocalTime :: !Microsecond + , receivedOffset :: !NtpOffset + } deriving (Eq, Show) + +-- | Wait for at least three replies and report the minimum of the reported offsets. +minimumOfThree :: [ReceivedPacket] -> Maybe NtpOffset +minimumOfThree l + = if length l >= 3 then Just $ minimum $ map receivedOffset l + else Nothing + +-- | Setup a NtpClient and run a application that uses that client. +-- The NtpClient is terminated when the application returns. +-- And also the application is terminated when the NtpClient crashes. withNtpClient :: Tracer IO NtpTrace -> NtpClientSettings -> (NtpClient -> IO a) -> IO a -withNtpClient tracer ntpSettings action - = forkNtpClient tracer ntpSettings >>= action - --- This function should be called once, it will run an NTP client in a new --- thread until the program terminates. -forkNtpClient :: Tracer IO NtpTrace -> NtpClientSettings -> IO NtpClient -forkNtpClient tracer ntpSettings = do +withNtpClient tracer ntpSettings action = do traceWith tracer NtpTraceStartNtpClient - ncStatus <- newTVarIO NtpSyncPending - -- using async so the NTP thread will be left running even if the parent - -- thread finished. - _ <- async (spawnNtpClient tracer ntpSettings ncStatus) - return $ NtpClient - { ntpGetStatus = readTVar ncStatus - , ntpTriggerUpdate = do - traceWith tracer NtpTraceClientActNow - atomically $ writeTVar ncStatus NtpSyncPending - } - -data NtpState = NtpState - { ncSockets :: TVar Sockets - -- ^ Ntp client sockets: ipv4 / ipv6 / both. - , ncState :: TVar [NtpOffset] - -- ^ List of ntp offsets and origin times (i.e. time when a request was - -- send) received from ntp servers since last polling interval. - , ncStatus :: TVar NtpStatus - -- ^ Ntp status: holds `NtpOffset` or a status of ntp client: - -- `NtpSyncPending`, `NtpSyncUnavailable`. It is computed from `ncState` - -- once all responses arrived. - , ncSettings :: NtpClientSettings - -- ^ Ntp client configuration. - } - -mkNtpClient :: NtpClientSettings -> TVar NtpStatus -> Sockets -> IO NtpState -mkNtpClient ncSettings ncStatus sock = do - ncSockets <- newTVarIO sock - ncState <- newTVarIO [] - return NtpState{..} - -data NoHostResolved = NoHostResolved - deriving (Show, Typeable) - -instance Exception NoHostResolved - -updateStatus :: Tracer IO NtpTrace -> NtpState -> IO () -updateStatus tracer cli = do - offsets <- readTVarIO (ncState cli) - status <- case offsets of - [] -> do - traceWith tracer NtpTraceUpdateStatusNoResponses - return NtpSyncUnavailable - l -> do - let o = ntpSelection (ncSettings cli) $ NE.fromList l - traceWith tracer $ NtpTraceUpdateStatusClockOffset $ getNtpOffset o - return $ NtpDrift o - atomically $ writeTVar (ncStatus cli) status - --- Every `ntpPollDelay` we send a request to the list of `ntpServers`. Before --- sending a request, we put `NtpSyncPending` to `ncState`. After sending --- all requests we wait until either all servers responded or --- `ntpResponseTimeout` passesed. If at least one server responded --- `handleCollectedResponses` will update `ncStatus` in `NtpClient` with a new --- drift. -sendLoop :: Tracer IO NtpTrace -> NtpState -> [Addresses] -> IO () -sendLoop tracer cli addrs = forever $ do - let respTimeout = ntpResponseTimeout (ncSettings cli) - let poll = ntpPollDelay (ncSettings cli) - - -- send packets and wait until end of poll delay - sock <- readTVarIO $ ncSockets cli - pack <- mkNtpPacket - sendPacket tracer sock pack addrs - - _ <- timeout respTimeout waitForResponses - updateStatus tracer cli - -- after @'updateStatus'@ @'ntpStatus'@ is guaranteed to be - -- different from @'NtpSyncPending'@, now we can wait until it was - -- changed back to @'NtpSyncPending'@ to force a request. - _ <- timeout poll waitForRequest - - -- reset state & status before next loop - atomically $ writeTVar (ncState cli) [] - atomically $ writeTVar (ncStatus cli) NtpSyncPending - + ntpStatus <- newTVarIO NtpSyncPending + withAsync (ntpClientThread tracer (ntpSettings, ntpStatus)) $ \tid -> do + let client = NtpClient + { ntpGetStatus = readTVar ntpStatus + , ntpTriggerUpdate = do + traceWith tracer NtpTraceClientActNow + atomically $ writeTVar ntpStatus NtpSyncPending + , ntpThread = tid + } + link tid -- an error in the ntp-client kills the appliction ! + action client + +udpLocalAddresses :: IO [AddrInfo] +udpLocalAddresses = do + let hints = Socket.defaultHints + { addrFlags = [AI_PASSIVE] + , addrSocketType = Datagram } + port = Socket.defaultPort + -- Hints Host Service + Socket.getAddrInfo (Just hints) Nothing (Just $ show port) + +resolveHost :: String -> IO [AddrInfo] +resolveHost host = Socket.getAddrInfo (Just hints) (Just host) Nothing + where + hints = Socket.defaultHints + { addrSocketType = Datagram + , addrFlags = [AI_ADDRCONFIG] -- since we use @AF_INET@ family + } + +firstAddr :: String -> [AddrInfo] -> IO (Maybe AddrInfo, Maybe AddrInfo) +firstAddr name l = case (find isV4Addr l, find isV6Addr l) of + (Nothing, Nothing) -> ioError $ userError $ "lookup host failed :" ++ name + p -> return p where - waitForResponses = do - atomically $ do - resps <- readTVar $ ncState cli - let svs = length $ ntpServers $ ncSettings cli - when (length resps < svs) - retry - traceWith tracer NtpTraceSendLoopCollectedAllResponses + isV4Addr :: AddrInfo -> Bool + isV4Addr addr = addrFamily addr == AF_INET - -- Wait for a request to force an ntp check. - waitForRequest = - atomically $ do - status <- readTVar $ ncStatus cli - check (status == NtpSyncPending) - return () + isV6Addr :: AddrInfo -> Bool + isV6Addr addr = addrFamily addr == AF_INET6 - timeout :: Microsecond -> IO a -> IO (Either () a) - timeout t io = race (threadDelay (fromIntegral t)) io --- | --- Listen for responses on the socket @'ncSockets'@ -receiveLoop :: Tracer IO NtpTrace -> NtpState -> IO () -receiveLoop tracer cli = - readTVarIO (ncSockets cli) >>= \case - These (Last (WithIPv6 sock_ipv6)) (Last (WithIPv4 sock_ipv4)) -> - loop IPv6 sock_ipv6 - `concurrently_` - loop IPv4 sock_ipv4 - This (Last (WithIPv6 sock_ipv6)) -> - loop IPv6 sock_ipv6 - That (Last (WithIPv4 sock_ipv4)) -> - loop IPv4 sock_ipv4 +setNtpPort :: SockAddr -> SockAddr +setNtpPort addr = case addr of + (SockAddrInet _ host) -> SockAddrInet ntpPort host + (SockAddrInet6 _ flow host scope) -> SockAddrInet6 ntpPort flow host scope + sockAddr -> sockAddr + where + ntpPort :: PortNumber + ntpPort = 123 + +threadDelayInterruptible :: TVar NtpStatus -> Int -> IO () +threadDelayInterruptible tvar t + = race_ + ( threadDelay t ) + ( atomically $ do + s <- readTVar tvar + check $ s == NtpSyncPending + ) + +-- TODO: maybe reset the delaytime if the oneshotClient did one sucessful query +ntpClientThread :: + Tracer IO NtpTrace + -> (NtpClientSettings, TVar NtpStatus) + -> IO () +ntpClientThread tracer args@(_, ntpStatus) = forM_ restartDelay $ \t -> do + traceWith tracer $ NtpTraceRestartDelay t + threadDelayInterruptible ntpStatus $ t * 1_000_000 + traceWith tracer NtpTraceRestartingClient + (forever $ oneshotClient tracer args) `catchIOError` + \err -> traceWith tracer $ NtpTraceIOError err + atomically $ writeTVar ntpStatus NtpSyncUnavailable where - -- Receive responses from the network and update NTP client state. - loop :: AddrFamily -> Socket.Socket -> IO () - loop addressFamily sock - = handle (handleIOException addressFamily) $ forever $ do - (bs, _) <- recvFrom sock ntpPacketSize - case decodeOrFail $ LBS.fromStrict bs of - Left (_, _, err) -> - traceWith tracer $ NtpTraceReceiveLoopDecodeError err - Right (_, _, packet) -> - handleNtpPacket packet - - -- Restart the @loop@ in case of errors; wait 5s before recreating the - -- socket. - handleIOException - :: AddrFamily - -> IOException - -> IO () - handleIOException addressFamily e = do - traceWith tracer $ NtpTraceReceiveLoopHandleIOException e - threadDelay 5000000 - udpLocalAddresses >>= createAndBindSock tracer addressFamily >>= \case - Nothing -> do - traceWith tracer NtpTraceReceiveLoopException --- logWarning "recreating of sockets failed (retrying)" - handleIOException addressFamily e - Just sock -> do - atomically $ modifyTVar' (ncSockets cli) (\s -> s <> sock) - case sock of - This (Last sock_) - -> loop addressFamily $ runWithAddrFamily sock_ - That (Last sock_) - -> loop addressFamily $ runWithAddrFamily sock_ - These _ _ - -> error "NtpClient: startReceive: impossible" - - -- Compute the clock offset based on current time and record it in the NTP - -- client state. A packet will be disgarded if it came after - -- @'ntpResponseTimeout'@. - handleNtpPacket - :: NtpPacket - -> IO () - handleNtpPacket packet = do - traceWith tracer NtpTraceReceiveLoopPacketReceived -- packet - clockOffset (ntpResponseTimeout $ ncSettings cli) packet >>= \case - Nothing -> traceWith tracer NtpTraceReceiveLoopLatePacket - Just offset -> do - traceWith tracer $ NtpTraceReceiveLoopPacketDeltaTime $ getNtpOffset offset - atomically $ modifyTVar' (ncState cli) ( offset : ) - --- | --- Spawn NTP client which will send request to NTP servers every @'ntpPollDelay'@ --- and will listen for responses. The @'ncStatus'@ will be updated every --- @'ntpPollDelay'@ with the most recent value. It should be run in a separate --- thread, since it will block infinitely. -spawnNtpClient :: Tracer IO NtpTrace -> NtpClientSettings -> TVar NtpStatus -> IO () -spawnNtpClient tracer settings ncStatus = do - traceWith tracer NtpTraceSpawnNtpClientStarting - bracket (mkSockets tracer settings) closeSockets $ \sock -> do - cli <- mkNtpClient settings ncStatus sock - - addrs <- resolve - -- TODO - -- we should start listening for requests when we send something, since - -- we're not expecting anything to come unless we send something. This - -- way we could simplify the client and remove `ncState` mutable cell. - receiveLoop tracer cli - `concurrently_` sendLoop tracer cli addrs - `concurrently_` traceWith tracer NtpTraceSpawnNtpClientStarted - + restartDelay :: [Int] + restartDelay = [0, 5, 10, 20, 60, 180, 600] ++ repeat 600 + +-- | Setup and run the NTP client. +-- In case of an IOError (for example when network interface goes down) cleanup and return. + +oneshotClient :: + Tracer IO NtpTrace + -> (NtpClientSettings, TVar NtpStatus) + -> IO () +oneshotClient tracer (ntpSettings, ntpStatus) = do + traceWith tracer NtpTraceClientStartQuery + (v4Servers, v6Servers) <- lookupServers $ ntpServers ntpSettings + (v4LocalAddr, v6LocalAddr) <- udpLocalAddresses >>= firstAddr "localhost" + (v4Replies, v6Replies) <- concurrently (runProtocol IPv4 v4LocalAddr v4Servers) + (runProtocol IPv6 v6LocalAddr v6Servers) + when (null v4Replies && null v6Replies) $ do + traceWith tracer NtpTraceIPv4IPv6BothFailed + atomically $ writeTVar ntpStatus NtpSyncUnavailable + ioError $ userError "IPv4 and IPv6 failed" + case (ntpReportPolicy ntpSettings) (v4Replies ++ v6Replies) of + Nothing -> do + traceWith tracer NtpTraceUpdateStatusQueryFailed + atomically $ writeTVar ntpStatus NtpSyncUnavailable + Just offset -> do + traceWith tracer $ NtpTraceUpdateStatusClockOffset $ getNtpOffset offset + atomically $ writeTVar ntpStatus $ NtpDrift offset + traceWith tracer NtpTraceClientSleeping + threadDelayInterruptible ntpStatus $ fromIntegral $ ntpPollDelay ntpSettings where - closeSockets :: Sockets -> IO () - closeSockets sockets = do - case sockets of - This a -> fn a - That a -> fn a - These a b -> fn a >> fn b - traceWith tracer NtpTraceSpawnNtpClientSocketsClosed - - fn :: Last (WithAddrFamily t Socket.Socket) -> IO () - fn (Last sock) = Socket.close $ runWithAddrFamily sock - - -- Try to resolve addresses, on failure wait 30s and start again. - resolve = do - traceWith tracer NtpTraceSpawnNtpClientResolveDNS - addrs <- catMaybes <$> traverse (resolveNtpHost tracer) (ntpServers settings) - if null addrs - then do - atomically $ writeTVar ncStatus NtpSyncUnavailable - -- either wait 30s or wait for `NtpSyncPending` which might be set - -- by a client (e.g. wallet), in which case try to resolve the dns. - race_ - (threadDelay 30000000) - (do - atomically $ do - s <- readTVar ncStatus - case s of - NtpSyncPending -> return () - _ -> retry - traceWith tracer NtpTraceSpawnNtpClientResolvePending - ) - resolve - else return addrs - --- Try to create IPv4 and IPv6 socket. -mkSockets :: Tracer IO NtpTrace -> NtpClientSettings -> IO Sockets -mkSockets tracer settings = - doMkSockets `catch` handleIOException >>= \case - Just socks -> pure socks - Nothing -> do - traceWith tracer NtpTraceMkSocketsNoSockets --- logWarning "Couldn't create either IPv4 or IPv6 socket, retrying in 5 sec..." - threadDelay 5000000 - mkSockets tracer settings + runProtocol :: IPVersion -> Maybe AddrInfo -> [AddrInfo] -> IO [ReceivedPacket] + runProtocol _version _localAddr [] = return [] + runProtocol _version Nothing _ = return [] + runProtocol version (Just addr) servers = do + queryServers tracer ntpSettings addr servers >>= \case + Left err -> do + traceWith tracer $ NtpTraceRunProtocolError version err + return [] + Right [] -> do + traceWith tracer $ NtpTraceRunProtocolNoResult version + return [] + Right r@(_:_) -> do + traceWith tracer $ NtpTraceRunProtocolSuccess version + return r + +queryServers :: + Tracer IO NtpTrace + -> NtpClientSettings + -> AddrInfo + -> [AddrInfo] + -> IO (Either IOError [ReceivedPacket]) +queryServers tracer netSettings localAddr destAddrs + = tryIOError $ bracket acquire release action where - doMkSockets :: IO (Maybe Sockets) - doMkSockets = do - addrs <- udpLocalAddresses - (<>) <$> (createAndBindSock tracer IPv4 addrs) - <*> (createAndBindSock tracer IPv6 addrs) - - handleIOException :: IOException -> IO (Maybe Sockets) - handleIOException e = do - traceWith tracer $ NtpTraceMkSocketsIOExecption e - threadDelay 5000000 - doMkSockets + acquire :: IO Socket + acquire = do + s <- Socket.socket (addrFamily localAddr) Datagram Socket.defaultProtocol + traceWith tracer NtpTraceSocketOpen + return s + + release :: Socket -> IO () + release s = do + Socket.close s + traceWith tracer NtpTraceSocketClosed + + action :: Socket -> IO [ReceivedPacket] + action socket = do + Socket.setSocketOption socket ReuseAddr 1 + inQueue <- atomically $ newTVar [] + _err <- withAsync (send socket >> loopForever) $ \sender -> + withAsync timeout $ \delay -> + withAsync (reader socket inQueue ) $ \revc -> + waitAnyCancel [sender, delay, revc] + atomically $ readTVar inQueue + + send :: Socket -> IO () + send sock = forM_ destAddrs $ \addr -> do + p <- mkNtpPacket + err <- tryIOError $ Socket.ByteString.sendManyTo sock + (LBS.toChunks $ encode p) (setNtpPort $ Socket.addrAddress addr) + case err of + Right _ -> traceWith tracer NtpTracePacketSent + Left e -> do + traceWith tracer $ NtpTracePacketSentError e + ioError e + threadDelay 100_000 + + loopForever = forever $ threadDelay maxBound + + timeout = do + threadDelay $ fromIntegral $ ntpResponseTimeout netSettings + traceWith tracer NtpTraceClientWaitingForRepliesTimeout + + reader :: Socket -> TVar [ReceivedPacket] -> IO () + reader socket inQueue = forever $ do + (bs, _) <- Socket.ByteString.recvFrom socket ntpPacketSize + t <- getCurrentTime + case decodeOrFail $ LBS.fromStrict bs of + Left (_, _, err) -> traceWith tracer $ NtpTraceSocketReaderDecodeError err + Right (_, _, packet) -> do + -- todo : filter bad packets, i.e. late packets and spoofed packets + traceWith tracer NtpTraceReceiveLoopPacketReceived + let received = ReceivedPacket packet t (clockOffsetPure packet t) + atomically $ modifyTVar' inQueue ((:) received) + +lookupServers :: [String] -> IO ([AddrInfo], [AddrInfo]) +lookupServers names = do + dests <- forM names $ \server -> resolveHost server >>= firstAddr server + return (mapMaybe fst dests, mapMaybe snd dests) + +testClient :: IO () +testClient = withNtpClient (contramapM (return . show) stdoutTracer) settings runApplication + where + runApplication ntpClient = race_ getLine $ forever $ do + status <- atomically $ ntpGetStatus ntpClient + traceWith stdoutTracer $ show ("main"::String, status) + threadDelay 10_000_000 + ntpTriggerUpdate ntpClient + + settings :: NtpClientSettings + settings = NtpClientSettings + { ntpServers = ["0.de.pool.ntp.org", "0.europe.pool.ntp.org", "0.pool.ntp.org" + , "1.pool.ntp.org", "2.pool.ntp.org", "3.pool.ntp.org"] + , ntpResponseTimeout = fromInteger 1_000_000 + , ntpPollDelay = fromInteger 300_000_000 + , ntpReportPolicy = minimumOfThree + } diff --git a/ntp-client/src/Network/NTP/Packet.hs b/ntp-client/src/Network/NTP/Packet.hs index f4969abc415..d0ea8a78642 100644 --- a/ntp-client/src/Network/NTP/Packet.hs +++ b/ntp-client/src/Network/NTP/Packet.hs @@ -7,10 +7,12 @@ module Network.NTP.Packet , ntpPacketSize , mkNtpPacket , NtpOffset (..) + , getCurrentTime , clockOffsetPure , clockOffset , realMcsToNtp , ntpToRealMcs + , Microsecond (..) ) where @@ -20,10 +22,11 @@ import Data.Binary.Get (getInt8, getWord32be, getWord8, skip) import Data.Binary.Put (putInt8, putWord32be, putWord8) import Data.Int (Int8) import Data.Time.Clock.POSIX (getPOSIXTime) -import Data.Time.Units (Microsecond, TimeUnit, fromMicroseconds, - toMicroseconds) import Data.Word (Word32, Word8) +newtype Microsecond = Microsecond Integer + deriving (Enum, Eq, Integral, Num, Ord, Real, Show) + data NtpPacket = NtpPacket { ntpParams :: Word8 -- ^ some magic parameters , ntpPoll :: Int8 -- ^ poll delay between requests @@ -98,7 +101,7 @@ ntpToRealMcs sec frac = -- ref: https://tools.ietf.org/html/rfc5905#section-6 fracMicro :: Integer fracMicro = (fromIntegral frac) `div` 4294 - in fromMicroseconds $ secMicro + fracMicro + in Microsecond $ secMicro + fracMicro -- | -- It is a partial function, since @Microsecond ~ Integer@; it is well defined @@ -108,7 +111,7 @@ ntpToRealMcs sec frac = -- @ -- (in microseconds; this is roughly 66 years, so we're fine untill 2036). realMcsToNtp :: Microsecond -> (Word32, Word32) -realMcsToNtp (toMicroseconds -> mcs) = +realMcsToNtp (Microsecond mcs) = let (sec, frac) = divMod mcs 1000000 in ( fromIntegral $ sec + ntpTimestampDelta , fromIntegral $ frac * 4294) @@ -127,7 +130,7 @@ mkNtpPacket = do -- | -- @'NtpOffset'@ is the difference between NTP time and local time. newtype NtpOffset = NtpOffset { getNtpOffset :: Microsecond } - deriving (Enum, Eq, Integral, Num, Ord, Real, Show, TimeUnit) + deriving (Enum, Eq, Integral, Num, Ord, Real, Show) clockOffsetPure :: NtpPacket -> Microsecond -> NtpOffset clockOffsetPure NtpPacket{..} localTime = NtpOffset @@ -152,4 +155,4 @@ clockOffset respTimeout packet = do -- | -- Helper function to get current time in @Microsecond@. getCurrentTime :: IO Microsecond -getCurrentTime = fromMicroseconds . round . ( * 1000000) <$> getPOSIXTime +getCurrentTime = Microsecond . round . ( * 1000000) <$> getPOSIXTime diff --git a/ntp-client/src/Network/NTP/Trace.hs b/ntp-client/src/Network/NTP/Trace.hs index ad21a6c9d33..751a480a99c 100644 --- a/ntp-client/src/Network/NTP/Trace.hs +++ b/ntp-client/src/Network/NTP/Trace.hs @@ -1,35 +1,42 @@ module Network.NTP.Trace where import Control.Exception (IOException) -import Data.Time.Units (Microsecond) +import Network.NTP.Packet (Microsecond) + +data IPVersion = IPv4 | IPv6 + deriving (Show) data NtpTrace - = NtpTraceStartNtpClient + = NtpTraceIOError IOError + | NtpTraceStartNtpClient | NtpTraceClientActNow - | NtpTraceClientForceCheck - | NtpTraceClientAbort - | NtpTraceUpdateStatusNoResponses + | NtpTraceRestartDelay Int + | NtpTraceRestartingClient + | NtpTraceClientStartQuery + | NtpTraceRunProtocolSuccess !IPVersion + | NtpTraceRunProtocolNoResult !IPVersion + | NtpTraceRunProtocolError !IPVersion IOError + | NtpTraceIPv4IPv6BothFailed + | NtpTraceUpdateStatusQueryFailed | NtpTraceUpdateStatusClockOffset Microsecond - | NtpTraceSendLoopCollectedAllResponses - | NtpTraceSpawnNtpClientStarting - | NtpTraceSpawnNtpClientStarted - | NtpTraceSpawnNtpClientSocketsClosed - | NtpTraceSpawnNtpClientResolveDNS - | NtpTraceSpawnNtpClientResolvePending - | NtpTraceReceiveLoopDecodeError String - | NtpTraceReceiveLoopHandleIOException IOException - | NtpTraceReceiveLoopException - | NtpTraceReceiveLoopLatePacket + | NtpTraceSocketOpen + | NtpTraceSocketClosed + | NtpTracePacketSent + | NtpTracePacketSentError IOException + | NtpTraceClientWaitingForRepliesTimeout | NtpTraceReceiveLoopPacketReceived - | NtpTraceReceiveLoopPacketDeltaTime Microsecond - | NtpTraceMkSocketsNoSockets - | NtpTraceMkSocketsIOExecption IOException - | NtpTraceResolvHostIOException IOException - | NtpTraceResolveHostNotResolved String - | NtpTraceResolveHostResolved String -- todo also log addr + | NtpTraceClientSleeping + + + + + + + | NtpTraceSocketReaderDecodeError String + | NtpTraceSocketReaderIOException IOException + | NtpTraceQueryLoopIOException IOException + | NtpTraceOneshotClientIOError IOException + | NtpTraceSocketCreated String String - | NtpTraceSendPacketNoMatchingSocket String String - | NtpTraceSentToIOException String IOException - | NtpTraceSentTryResend String - | NtpTraceSentNotRetrying + deriving (Show) diff --git a/ntp-client/src/Network/NTP/Util.hs b/ntp-client/src/Network/NTP/Util.hs deleted file mode 100644 index 58f87f45fd1..00000000000 --- a/ntp-client/src/Network/NTP/Util.hs +++ /dev/null @@ -1,270 +0,0 @@ -{-# LANGUAGE CPP #-} -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE GADTs #-} -{-# LANGUAGE KindSignatures #-} -{-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE TypeApplications #-} - -module Network.NTP.Util - ( ntpPort - , WithAddrFamily (..) - , runWithAddrFamily - , getAddrFamily - , AddrFamily (..) - , Addresses - , Sockets - , resolveNtpHost - , sendPacket - - , createAndBindSock - , udpLocalAddresses - - , pairThese - ) where - -import Control.Exception (Exception, IOException, catch, throw) -import Control.Monad (void) -import Control.Tracer -import Data.Bifunctor (Bifunctor (..)) -import Data.Binary (encode) -import Data.ByteString (ByteString) -import qualified Data.ByteString.Lazy as LBS -import Data.Foldable (traverse_) -import Data.List (find) -import Data.Semigroup (First (..), Last (..), Option (..), - Semigroup (..)) -import Data.These (These (..)) -import Network.Socket (AddrInfo, - AddrInfoFlag (AI_ADDRCONFIG, AI_PASSIVE), - Family (AF_INET, AF_INET6), PortNumber, SockAddr (..), - Socket, SocketOption (ReuseAddr), SocketType (Datagram), - addrAddress, addrFamily, addrFlags, addrSocketType) -import qualified Network.Socket as Socket -import qualified Network.Socket.ByteString as Socket.ByteString (sendTo) - -import Network.NTP.Packet (NtpPacket) -import Network.NTP.Trace (NtpTrace (..)) - -data AddrFamily = IPv4 | IPv6 - deriving Show - --- | --- Newtype wrapper which tags the type with either IPv4 or IPv6 phantom type. -data WithAddrFamily (t :: AddrFamily) a where - WithIPv6 :: a -> WithAddrFamily 'IPv6 a - WithIPv4 :: a -> WithAddrFamily 'IPv4 a - -instance Show a => Show (WithAddrFamily t a) where - show a = show (getAddrFamily a) ++ " " ++ show (runWithAddrFamily a) - -instance Eq a => Eq (WithAddrFamily t a) where - a == b = runWithAddrFamily a == runWithAddrFamily b - -instance Functor (WithAddrFamily t) where - fmap f (WithIPv6 a) = WithIPv6 (f a) - fmap f (WithIPv4 a) = WithIPv4 (f a) - -runWithAddrFamily :: WithAddrFamily t a -> a -runWithAddrFamily (WithIPv6 a) = a -runWithAddrFamily (WithIPv4 a) = a - -getAddrFamily :: WithAddrFamily t a -> AddrFamily -getAddrFamily (WithIPv6 _) = IPv6 -getAddrFamily (WithIPv4 _) = IPv4 - --- | --- Note that the composition of `foldThese . bimap f g` is a proof that --- @'These a b@ is the [free --- product](https://en.wikipedia.org/wiki/Free_product) of two semigroups @a@ --- and @b@. -foldThese - :: Semigroup a - => These a a - -> a -foldThese (This a) = a -foldThese (That a) = a -foldThese (These a1 a2) = a1 <> a2 - -pairThese - :: These a b - -> These x y - -> Maybe (These (a, x) (b, y)) -pairThese (These a b) (These x y) = Just $ These (a, x) (b, y) -pairThese (This a) (This x) = Just $ This (a, x) -pairThese (These a _) (This x) = Just $ This (a, x) -pairThese (This a) (These x _) = Just $ This (a, x) -pairThese (That b) (That y) = Just $ That (b, y) -pairThese (These _ b) (That y) = Just $ That (b, y) -pairThese (That b) (These _ y) = Just $ That (b, y) -pairThese _ _ = Nothing - --- | --- Store created sockets. If system supports IPv6 and IPv4 we create socket for --- IPv4 and IPv6. Otherwise only one. -type Sockets = These - (Last (WithAddrFamily 'IPv6 Socket)) - (Last (WithAddrFamily 'IPv4 Socket)) - --- | --- A counter part of @'Ntp.Client.Sockets'@ data type. -type Addresses = These - (First (WithAddrFamily 'IPv6 SockAddr)) - (First (WithAddrFamily 'IPv4 SockAddr)) - -ntpPort :: PortNumber -ntpPort = 123 - --- | --- Returns a list of alternatives. At most of length two, --- at most one ipv6 / ipv4 address. -resolveHost :: Tracer IO NtpTrace -> String -> IO (Maybe Addresses) -resolveHost tracer host = do - let hints = Socket.defaultHints - { addrSocketType = Datagram - , addrFlags = [AI_ADDRCONFIG] -- since we use @AF_INET@ family - } - -- TBD why catch here? Why not let @'resolveHost'@ throw the exception? - addrInfos <- Socket.getAddrInfo (Just hints) (Just host) Nothing - `catch` (\(e :: IOException) -> (traceWith tracer $ NtpTraceResolvHostIOException e) >> return []) - - let maddr = getOption $ foldMap fn addrInfos - case maddr of - Nothing -> traceWith tracer $ NtpTraceResolveHostNotResolved host - Just _addr -> traceWith tracer $ NtpTraceResolveHostResolved host - {- - where - g :: First (WithAddrFamily t SockAddr) -> [SockAddr] - g (First a) = [runWithAddrFamily a] - addrs :: [SockAddr] - addrs = foldThese . bimap g g $ addr - -} - - return maddr - where - -- Return supported address: one ipv6 and one ipv4 address. - fn :: AddrInfo -> Option Addresses - fn addr = case Socket.addrFamily addr of - Socket.AF_INET6 -> - Option $ Just $ This $ First $ (WithIPv6 $ Socket.addrAddress addr) - Socket.AF_INET -> - Option $ Just $ That $ First $ (WithIPv4 $ Socket.addrAddress addr) - _ -> mempty - -resolveNtpHost :: Tracer IO NtpTrace -> String -> IO (Maybe Addresses) -resolveNtpHost tracer host = do - addr <- resolveHost tracer host - return $ fmap (bimap adjustPort adjustPort) addr - where - adjustPort :: First (WithAddrFamily t SockAddr) -> First (WithAddrFamily t SockAddr) - adjustPort = fmap $ fmap (replacePort ntpPort) - -replacePort :: PortNumber -> SockAddr -> SockAddr -replacePort port (SockAddrInet _ host) = SockAddrInet port host -replacePort port (SockAddrInet6 _ flow host scope) = SockAddrInet6 port flow host scope -replacePort _ sockAddr = sockAddr - -createAndBindSock - :: Tracer IO NtpTrace - -> AddrFamily - -- ^ indicates which socket family to create, either @AF_INET6@ or @AF_INET@ - -> [AddrInfo] - -- ^ list of local addresses - -> IO (Maybe Sockets) -createAndBindSock tracer addressFamily addrs = - traverse createDo (selectAddr addrs) - where - selectAddr :: [AddrInfo] -> Maybe AddrInfo - selectAddr = find $ \addr -> - case addressFamily of - IPv6 -> addrFamily addr == AF_INET6 - IPv4 -> addrFamily addr == AF_INET - - createDo addr = do - sock <- Socket.socket (addrFamily addr) Datagram Socket.defaultProtocol - Socket.setSocketOption sock ReuseAddr 1 - Socket.bind sock (addrAddress addr) - traceWith tracer $ NtpTraceSocketCreated (show $ addrFamily addr) (show $ addrAddress addr) --- logInfo $ --- sformat ("Created socket (family/addr): "%shown%"/"%shown) --- (addrFamily addr) (addrAddress addr) - case addressFamily of - IPv6 -> return $ This $ Last $ (WithIPv6 sock) - IPv4 -> return $ That $ Last $ (WithIPv4 sock) - -udpLocalAddresses :: IO [AddrInfo] -udpLocalAddresses = do - let hints = Socket.defaultHints - { addrFlags = [AI_PASSIVE] - , addrSocketType = Datagram } -#if MIN_VERSION_network(2,8,0) - port = Socket.defaultPort -#else - port = Socket.aNY_PORT -#endif - -- Hints Host Service - Socket.getAddrInfo (Just hints) Nothing (Just $ show port) - -data SendToException - = NoMatchingSocket - | SendToIOException AddrFamily IOException - deriving Show - -instance Exception SendToException - - --- | --- Send a request to @addr :: Addresses@ using @sock :: Sockets@. -sendTo - :: Sockets - -> ByteString - -> Addresses - -- ^ addresses to send to - -> IO () -sendTo sock bs addr = case pairThese sock addr of - Just s -> foldThese $ bimap fn fn s - Nothing -> throw NoMatchingSocket - where - fn :: ( Last (WithAddrFamily t Socket) - , First (WithAddrFamily t SockAddr) - ) - -> IO () - fn (Last sock_, First addr_) = - void (Socket.ByteString.sendTo (runWithAddrFamily sock_) bs (runWithAddrFamily addr_)) - `catch` handleIOException (getAddrFamily addr_) - - handleIOException :: AddrFamily -> IOException -> IO () - handleIOException addressFamily e = throw (SendToIOException addressFamily e) - --- | --- Low level primitive which sends a request to a single NTP server. -sendPacket - :: Tracer IO NtpTrace - -> Sockets - -> NtpPacket - -> [Addresses] - -> IO () -sendPacket tracer sock packet addrs = do - let bs = LBS.toStrict $ encode $ packet - traverse_ - (\addr -> - (sendTo sock bs addr) - `catch` handleSendToException addr - ) - addrs - where - handleSendToException :: Addresses -> SendToException -> IO () - handleSendToException addr NoMatchingSocket = - traceWith tracer $ NtpTraceSendPacketNoMatchingSocket (show addr) (show sock) - handleSendToException addr (SendToIOException addressFamily ioerr) = do - traceWith tracer $ NtpTraceSentToIOException (show addressFamily) ioerr - case (addr, addressFamily) of - -- try to send the packet to the other address in case the current - -- system does not support IPv4/6. - (These _ r, IPv6) -> do - traceWith tracer $ NtpTraceSentTryResend $ show $ runWithAddrFamily $ getFirst r - sendPacket tracer sock packet [That r] - (These l _, IPv4) -> do - traceWith tracer $ NtpTraceSentTryResend $ show $ runWithAddrFamily $ getFirst l - sendPacket tracer sock packet [This l] - _ -> traceWith tracer $ NtpTraceSentNotRetrying diff --git a/ntp-client/test/NtpApp.hs b/ntp-client/test/NtpApp.hs deleted file mode 100644 index c84afbd7dd5..00000000000 --- a/ntp-client/test/NtpApp.hs +++ /dev/null @@ -1,25 +0,0 @@ -module Main -where -import Control.Concurrent (threadDelay) -import Control.Concurrent.STM (atomically) -import Control.Concurrent.Async -import Control.Monad -import Control.Tracer - -import Network.NTP.Client - -settings :: NtpClientSettings -settings = NtpClientSettings - { ntpServers = ["0.de.pool.ntp.org","0.europe.pool.ntp.org","0.pool.ntp.org"] - , ntpResponseTimeout = fromInteger 1000000 - , ntpPollDelay = fromInteger 3000000 - , ntpSelection = minimum - } - -main :: IO () -main = withNtpClient (contramapM (return . show) stdoutTracer) settings runClient - where - runClient ntpClient = race_ getLine $ forever $ do - status <- atomically $ ntpGetStatus ntpClient - traceWith stdoutTracer $ show ("main",status) - threadDelay 3000000 diff --git a/ntp-client/test/Test.hs b/ntp-client/test/Test.hs index 2ada6f89966..6c903dbd84c 100644 --- a/ntp-client/test/Test.hs +++ b/ntp-client/test/Test.hs @@ -6,15 +6,27 @@ module Main ) where import Data.Binary (decodeOrFail, encode) -import Data.Time.Units (Microsecond, fromMicroseconds, toMicroseconds) import Data.Word (Word32) import Test.Tasty (TestTree, defaultMain, testGroup) import Test.Tasty.QuickCheck (testProperty) -import Test.QuickCheck (Arbitrary (..), Gen, Property, NonNegative(..) , arbitrary, choose, counterexample, sized, (.&&.), (===)) - -import Network.NTP.Packet (NtpOffset (..), NtpPacket (..), clockOffsetPure, - ntpToRealMcs, realMcsToNtp) +import Test.QuickCheck ( Arbitrary (..) + , Gen + , Property + , NonNegative(..) + , arbitrary + , choose + , counterexample + , sized + , (.&&.) + , (===)) + +import Network.NTP.Packet ( Microsecond (..) + , NtpOffset (..) + , NtpPacket (..) + , clockOffsetPure + , ntpToRealMcs + , realMcsToNtp) main :: IO () main = defaultMain tests @@ -27,12 +39,12 @@ data NtpPacketWithOffset = NtpPacketWithOffset deriving (Show) genMicro :: Gen Microsecond -genMicro = fromMicroseconds <$> arbitrary +genMicro = Microsecond <$> arbitrary genMicroNotBefore :: Microsecond -> Gen Microsecond -genMicroNotBefore t = do +genMicroNotBefore (Microsecond t) = do (NonNegative offset) <- arbitrary - return $ fromMicroseconds $ toMicroseconds t + offset + return $ Microsecond $ t + offset newtype ArbitraryNtpPacket = ArbitraryNtpPacket NtpPacket @@ -54,7 +66,7 @@ instance Arbitrary ArbitraryNtpPacket where instance Arbitrary NtpPacketWithOffset where arbitrary = sized $ \offset -> do let drift :: Microsecond - drift = fromMicroseconds $ fromIntegral offset + drift = Microsecond $ fromIntegral offset ntpParams <- arbitrary ntpPoll <- arbitrary ntpOriginTime <- genMicro @@ -86,7 +98,7 @@ newtype NtpMicrosecond = NtpMicrosecond Microsecond -- Generate NtpMicrosecond which must be smaller than -- @'maxBound' \@Word32 - 2200898800@ (we substract 70 years in seconds). instance Arbitrary NtpMicrosecond where - arbitrary = (NtpMicrosecond . fromMicroseconds) <$> choose (0, endTime) + arbitrary = (NtpMicrosecond . Microsecond) <$> choose (0, endTime) where endTime = (fromIntegral $ maxBound @Word32) - 2208988800 tests:: TestTree diff --git a/stack.yaml b/stack.yaml index e97e4c167a4..f7a0ad95008 100644 --- a/stack.yaml +++ b/stack.yaml @@ -71,7 +71,6 @@ extra-deps: - hedgehog-quickcheck-0.1.1 - quickcheck-state-machine-0.6.0 - splitmix-0.0.2 - - time-units-1.0.0 - tasty-hedgehog-1.0.0.1 - Unique-0.4.7.6 - statistics-linreg-0.3 From 362fbdc8956ae4f0a0faaee8204d084efc4af4bc Mon Sep 17 00:00:00 2001 From: MarcFontaine Date: Mon, 27 Jan 2020 15:49:15 +0100 Subject: [PATCH 02/27] fix response timeout --- ntp-client/src/Network/NTP/Client.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ntp-client/src/Network/NTP/Client.hs b/ntp-client/src/Network/NTP/Client.hs index e12ecffb4f1..d82587cc605 100644 --- a/ntp-client/src/Network/NTP/Client.hs +++ b/ntp-client/src/Network/NTP/Client.hs @@ -255,7 +255,7 @@ queryServers tracer netSettings localAddr destAddrs loopForever = forever $ threadDelay maxBound timeout = do - threadDelay $ fromIntegral $ ntpResponseTimeout netSettings + threadDelay $ (fromIntegral $ ntpResponseTimeout netSettings) + 100_000 * length destAddrs traceWith tracer NtpTraceClientWaitingForRepliesTimeout reader :: Socket -> TVar [ReceivedPacket] -> IO () From 82f535e746ca487b3d5588e9359bbc50630f2ffe Mon Sep 17 00:00:00 2001 From: MarcFontaine Date: Tue, 28 Jan 2020 14:05:19 +0100 Subject: [PATCH 03/27] split Client.hs module --- ntp-client/ntp-client.cabal | 2 + ntp-client/src/Network/NTP/Client.hs | 254 ++------------------------- ntp-client/src/Network/NTP/Query.hs | 220 +++++++++++++++++++++++ ntp-client/src/Network/NTP/Test.hs | 38 ++++ 4 files changed, 276 insertions(+), 238 deletions(-) create mode 100644 ntp-client/src/Network/NTP/Query.hs create mode 100644 ntp-client/src/Network/NTP/Test.hs diff --git a/ntp-client/ntp-client.cabal b/ntp-client/ntp-client.cabal index 9b40cbf7188..2cbfa8daff2 100644 --- a/ntp-client/ntp-client.cabal +++ b/ntp-client/ntp-client.cabal @@ -12,6 +12,8 @@ cabal-version: >=1.20 Library exposed-modules: Network.NTP.Client Network.NTP.Packet + Network.NTP.Query + Network.NTP.Test Network.NTP.Trace build-depends: async >=2.2 && <2.3 , base >=4.9 && <4.13 diff --git a/ntp-client/src/Network/NTP/Client.hs b/ntp-client/src/Network/NTP/Client.hs index d82587cc605..99927c0ea46 100644 --- a/ntp-client/src/Network/NTP/Client.hs +++ b/ntp-client/src/Network/NTP/Client.hs @@ -1,12 +1,4 @@ -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE GADTs #-} -{-# LANGUAGE KindSignatures #-} -{-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE TypeApplications #-} -{-# LANGUAGE LambdaCase #-} {-# LANGUAGE NumericUnderscores #-} - module Network.NTP.Client where @@ -14,49 +6,12 @@ import Control.Concurrent (threadDelay) import Control.Concurrent.Async import Control.Concurrent.STM (STM, atomically, check) import Control.Concurrent.STM.TVar -import Control.Exception (bracket) -import System.IO.Error (catchIOError, tryIOError, userError, ioError) -import Control.Monad (forever, forM, forM_, when) +import System.IO.Error (catchIOError) +import Control.Monad import Control.Tracer -import Data.Binary (decodeOrFail, encode) -import qualified Data.ByteString.Lazy as LBS -import Data.List (find) -import Data.Maybe -import Network.Socket ( AddrInfo () - , AddrInfoFlag (AI_ADDRCONFIG, AI_PASSIVE) - , Family (AF_INET, AF_INET6) - , PortNumber - , Socket - , SockAddr (..) - , SocketOption (ReuseAddr) - , SocketType (Datagram) - , addrAddress - , addrFamily - , addrFlags - , addrSocketType) -import qualified Network.Socket as Socket -import qualified Network.Socket.ByteString as Socket.ByteString (recvFrom, sendManyTo) -import Network.NTP.Packet ( NtpPacket - , mkNtpPacket - , ntpPacketSize - , Microsecond - , NtpOffset (..) - , getCurrentTime - , clockOffsetPure) -import Network.NTP.Trace (NtpTrace (..), IPVersion (..)) - -main :: IO () -main = testClient -data NtpClientSettings = NtpClientSettings - { ntpServers :: [String] - -- ^ List of servers addresses. - , ntpResponseTimeout :: Microsecond - -- ^ Timeout between sending NTP requests and response collection. - , ntpPollDelay :: Microsecond - -- ^ How long to wait between two rounds of requests. - , ntpReportPolicy :: [ReceivedPacket] -> Maybe NtpOffset - } +import Network.NTP.Query +import Network.NTP.Trace data NtpClient = NtpClient { -- | Query the current NTP status. @@ -66,27 +21,6 @@ data NtpClient = NtpClient , ntpThread :: Async () } -data NtpStatus = - -- | The difference between NTP time and local system time - NtpDrift NtpOffset - -- | NTP client has send requests to the servers - | NtpSyncPending - -- | NTP is not available: the client has not received any respond within - -- `ntpResponseTimeout` or NTP was not configured. - | NtpSyncUnavailable deriving (Eq, Show) - -data ReceivedPacket = ReceivedPacket - { receivedPacket :: !NtpPacket - , receivedLocalTime :: !Microsecond - , receivedOffset :: !NtpOffset - } deriving (Eq, Show) - --- | Wait for at least three replies and report the minimum of the reported offsets. -minimumOfThree :: [ReceivedPacket] -> Maybe NtpOffset -minimumOfThree l - = if length l >= 3 then Just $ minimum $ map receivedOffset l - else Nothing - -- | Setup a NtpClient and run a application that uses that client. -- The NtpClient is terminated when the application returns. -- And also the application is terminated when the NtpClient crashes. @@ -94,7 +28,7 @@ withNtpClient :: Tracer IO NtpTrace -> NtpClientSettings -> (NtpClient -> IO a) withNtpClient tracer ntpSettings action = do traceWith tracer NtpTraceStartNtpClient ntpStatus <- newTVarIO NtpSyncPending - withAsync (ntpClientThread tracer (ntpSettings, ntpStatus)) $ \tid -> do + withAsync (ntpClientThread tracer ntpSettings ntpStatus) $ \tid -> do let client = NtpClient { ntpGetStatus = readTVar ntpStatus , ntpTriggerUpdate = do @@ -105,44 +39,6 @@ withNtpClient tracer ntpSettings action = do link tid -- an error in the ntp-client kills the appliction ! action client -udpLocalAddresses :: IO [AddrInfo] -udpLocalAddresses = do - let hints = Socket.defaultHints - { addrFlags = [AI_PASSIVE] - , addrSocketType = Datagram } - port = Socket.defaultPort - -- Hints Host Service - Socket.getAddrInfo (Just hints) Nothing (Just $ show port) - -resolveHost :: String -> IO [AddrInfo] -resolveHost host = Socket.getAddrInfo (Just hints) (Just host) Nothing - where - hints = Socket.defaultHints - { addrSocketType = Datagram - , addrFlags = [AI_ADDRCONFIG] -- since we use @AF_INET@ family - } - -firstAddr :: String -> [AddrInfo] -> IO (Maybe AddrInfo, Maybe AddrInfo) -firstAddr name l = case (find isV4Addr l, find isV6Addr l) of - (Nothing, Nothing) -> ioError $ userError $ "lookup host failed :" ++ name - p -> return p - where - isV4Addr :: AddrInfo -> Bool - isV4Addr addr = addrFamily addr == AF_INET - - isV6Addr :: AddrInfo -> Bool - isV6Addr addr = addrFamily addr == AF_INET6 - - -setNtpPort :: SockAddr -> SockAddr -setNtpPort addr = case addr of - (SockAddrInet _ host) -> SockAddrInet ntpPort host - (SockAddrInet6 _ flow host scope) -> SockAddrInet6 ntpPort flow host scope - sockAddr -> sockAddr - where - ntpPort :: PortNumber - ntpPort = 123 - threadDelayInterruptible :: TVar NtpStatus -> Int -> IO () threadDelayInterruptible tvar t = race_ @@ -155,140 +51,22 @@ threadDelayInterruptible tvar t -- TODO: maybe reset the delaytime if the oneshotClient did one sucessful query ntpClientThread :: Tracer IO NtpTrace - -> (NtpClientSettings, TVar NtpStatus) + -> NtpClientSettings + -> TVar NtpStatus -> IO () -ntpClientThread tracer args@(_, ntpStatus) = forM_ restartDelay $ \t -> do +ntpClientThread tracer ntpSettings ntpStatus = forM_ restartDelay $ \t -> do traceWith tracer $ NtpTraceRestartDelay t threadDelayInterruptible ntpStatus $ t * 1_000_000 traceWith tracer NtpTraceRestartingClient - (forever $ oneshotClient tracer args) `catchIOError` - \err -> traceWith tracer $ NtpTraceIOError err + catchIOError + (forever $ do + status <- oneshotClient tracer ntpSettings + atomically $ writeTVar ntpStatus status + traceWith tracer NtpTraceClientSleeping + threadDelayInterruptible ntpStatus $ fromIntegral $ ntpPollDelay ntpSettings + ) + (\err -> traceWith tracer $ NtpTraceIOError err) atomically $ writeTVar ntpStatus NtpSyncUnavailable where restartDelay :: [Int] restartDelay = [0, 5, 10, 20, 60, 180, 600] ++ repeat 600 - --- | Setup and run the NTP client. --- In case of an IOError (for example when network interface goes down) cleanup and return. - -oneshotClient :: - Tracer IO NtpTrace - -> (NtpClientSettings, TVar NtpStatus) - -> IO () -oneshotClient tracer (ntpSettings, ntpStatus) = do - traceWith tracer NtpTraceClientStartQuery - (v4Servers, v6Servers) <- lookupServers $ ntpServers ntpSettings - (v4LocalAddr, v6LocalAddr) <- udpLocalAddresses >>= firstAddr "localhost" - (v4Replies, v6Replies) <- concurrently (runProtocol IPv4 v4LocalAddr v4Servers) - (runProtocol IPv6 v6LocalAddr v6Servers) - when (null v4Replies && null v6Replies) $ do - traceWith tracer NtpTraceIPv4IPv6BothFailed - atomically $ writeTVar ntpStatus NtpSyncUnavailable - ioError $ userError "IPv4 and IPv6 failed" - case (ntpReportPolicy ntpSettings) (v4Replies ++ v6Replies) of - Nothing -> do - traceWith tracer NtpTraceUpdateStatusQueryFailed - atomically $ writeTVar ntpStatus NtpSyncUnavailable - Just offset -> do - traceWith tracer $ NtpTraceUpdateStatusClockOffset $ getNtpOffset offset - atomically $ writeTVar ntpStatus $ NtpDrift offset - traceWith tracer NtpTraceClientSleeping - threadDelayInterruptible ntpStatus $ fromIntegral $ ntpPollDelay ntpSettings - where - runProtocol :: IPVersion -> Maybe AddrInfo -> [AddrInfo] -> IO [ReceivedPacket] - runProtocol _version _localAddr [] = return [] - runProtocol _version Nothing _ = return [] - runProtocol version (Just addr) servers = do - queryServers tracer ntpSettings addr servers >>= \case - Left err -> do - traceWith tracer $ NtpTraceRunProtocolError version err - return [] - Right [] -> do - traceWith tracer $ NtpTraceRunProtocolNoResult version - return [] - Right r@(_:_) -> do - traceWith tracer $ NtpTraceRunProtocolSuccess version - return r - -queryServers :: - Tracer IO NtpTrace - -> NtpClientSettings - -> AddrInfo - -> [AddrInfo] - -> IO (Either IOError [ReceivedPacket]) -queryServers tracer netSettings localAddr destAddrs - = tryIOError $ bracket acquire release action - where - acquire :: IO Socket - acquire = do - s <- Socket.socket (addrFamily localAddr) Datagram Socket.defaultProtocol - traceWith tracer NtpTraceSocketOpen - return s - - release :: Socket -> IO () - release s = do - Socket.close s - traceWith tracer NtpTraceSocketClosed - - action :: Socket -> IO [ReceivedPacket] - action socket = do - Socket.setSocketOption socket ReuseAddr 1 - inQueue <- atomically $ newTVar [] - _err <- withAsync (send socket >> loopForever) $ \sender -> - withAsync timeout $ \delay -> - withAsync (reader socket inQueue ) $ \revc -> - waitAnyCancel [sender, delay, revc] - atomically $ readTVar inQueue - - send :: Socket -> IO () - send sock = forM_ destAddrs $ \addr -> do - p <- mkNtpPacket - err <- tryIOError $ Socket.ByteString.sendManyTo sock - (LBS.toChunks $ encode p) (setNtpPort $ Socket.addrAddress addr) - case err of - Right _ -> traceWith tracer NtpTracePacketSent - Left e -> do - traceWith tracer $ NtpTracePacketSentError e - ioError e - threadDelay 100_000 - - loopForever = forever $ threadDelay maxBound - - timeout = do - threadDelay $ (fromIntegral $ ntpResponseTimeout netSettings) + 100_000 * length destAddrs - traceWith tracer NtpTraceClientWaitingForRepliesTimeout - - reader :: Socket -> TVar [ReceivedPacket] -> IO () - reader socket inQueue = forever $ do - (bs, _) <- Socket.ByteString.recvFrom socket ntpPacketSize - t <- getCurrentTime - case decodeOrFail $ LBS.fromStrict bs of - Left (_, _, err) -> traceWith tracer $ NtpTraceSocketReaderDecodeError err - Right (_, _, packet) -> do - -- todo : filter bad packets, i.e. late packets and spoofed packets - traceWith tracer NtpTraceReceiveLoopPacketReceived - let received = ReceivedPacket packet t (clockOffsetPure packet t) - atomically $ modifyTVar' inQueue ((:) received) - -lookupServers :: [String] -> IO ([AddrInfo], [AddrInfo]) -lookupServers names = do - dests <- forM names $ \server -> resolveHost server >>= firstAddr server - return (mapMaybe fst dests, mapMaybe snd dests) - -testClient :: IO () -testClient = withNtpClient (contramapM (return . show) stdoutTracer) settings runApplication - where - runApplication ntpClient = race_ getLine $ forever $ do - status <- atomically $ ntpGetStatus ntpClient - traceWith stdoutTracer $ show ("main"::String, status) - threadDelay 10_000_000 - ntpTriggerUpdate ntpClient - - settings :: NtpClientSettings - settings = NtpClientSettings - { ntpServers = ["0.de.pool.ntp.org", "0.europe.pool.ntp.org", "0.pool.ntp.org" - , "1.pool.ntp.org", "2.pool.ntp.org", "3.pool.ntp.org"] - , ntpResponseTimeout = fromInteger 1_000_000 - , ntpPollDelay = fromInteger 300_000_000 - , ntpReportPolicy = minimumOfThree - } diff --git a/ntp-client/src/Network/NTP/Query.hs b/ntp-client/src/Network/NTP/Query.hs new file mode 100644 index 00000000000..e2a1be8eef1 --- /dev/null +++ b/ntp-client/src/Network/NTP/Query.hs @@ -0,0 +1,220 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE KindSignatures #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NumericUnderscores #-} + +module Network.NTP.Query +where + +import Control.Concurrent (threadDelay) +import Control.Concurrent.Async +import Control.Concurrent.STM +import Control.Exception (bracket) +import System.IO.Error (tryIOError, userError, ioError) +import Control.Monad (forever, forM, forM_, when) +import Control.Tracer +import Data.Binary (decodeOrFail, encode) +import qualified Data.ByteString.Lazy as LBS +import Data.List (find) +import Data.Maybe +import Network.Socket ( AddrInfo + , AddrInfoFlag (AI_ADDRCONFIG, AI_PASSIVE) + , Family (AF_INET, AF_INET6) + , PortNumber + , Socket + , SockAddr (..) + , SocketOption (ReuseAddr) + , SocketType (Datagram) + , addrFamily + , addrFlags + , addrSocketType) +import qualified Network.Socket as Socket +import qualified Network.Socket.ByteString as Socket.ByteString (recvFrom, sendManyTo) +import Network.NTP.Packet ( NtpPacket + , mkNtpPacket + , ntpPacketSize + , Microsecond + , NtpOffset (..) + , getCurrentTime + , clockOffsetPure) +import Network.NTP.Trace (NtpTrace (..), IPVersion (..)) + +data NtpClientSettings = NtpClientSettings + { ntpServers :: [String] + -- ^ List of servers addresses. + , ntpResponseTimeout :: Microsecond + -- ^ Timeout between sending NTP requests and response collection. + , ntpPollDelay :: Microsecond + -- ^ How long to wait between two rounds of requests. + , ntpReportPolicy :: [ReceivedPacket] -> Maybe NtpOffset + } + +data NtpStatus = + -- | The difference between NTP time and local system time + NtpDrift NtpOffset + -- | NTP client has send requests to the servers + | NtpSyncPending + -- | NTP is not available: the client has not received any respond within + -- `ntpResponseTimeout` or NTP was not configured. + | NtpSyncUnavailable deriving (Eq, Show) + +data ReceivedPacket = ReceivedPacket + { receivedPacket :: !NtpPacket + , receivedLocalTime :: !Microsecond + , receivedOffset :: !NtpOffset + } deriving (Eq, Show) + +-- | Wait for at least three replies and report the minimum of the reported offsets. +minimumOfThree :: [ReceivedPacket] -> Maybe NtpOffset +minimumOfThree l + = if length l >= 3 then Just $ minimum $ map receivedOffset l + else Nothing +udpLocalAddresses :: IO [AddrInfo] +udpLocalAddresses = do + let hints = Socket.defaultHints + { addrFlags = [AI_PASSIVE] + , addrSocketType = Datagram } + port = Socket.defaultPort + -- Hints Host Service + Socket.getAddrInfo (Just hints) Nothing (Just $ show port) + +resolveHost :: String -> IO [AddrInfo] +resolveHost host = Socket.getAddrInfo (Just hints) (Just host) Nothing + where + hints = Socket.defaultHints + { addrSocketType = Datagram + , addrFlags = [AI_ADDRCONFIG] -- since we use @AF_INET@ family + } + +firstAddr :: String -> [AddrInfo] -> IO (Maybe AddrInfo, Maybe AddrInfo) +firstAddr name l = case (find isV4Addr l, find isV6Addr l) of + (Nothing, Nothing) -> ioError $ userError $ "lookup host failed :" ++ name + p -> return p + where + isV4Addr :: AddrInfo -> Bool + isV4Addr addr = addrFamily addr == AF_INET + + isV6Addr :: AddrInfo -> Bool + isV6Addr addr = addrFamily addr == AF_INET6 + + +setNtpPort :: SockAddr -> SockAddr +setNtpPort addr = case addr of + (SockAddrInet _ host) -> SockAddrInet ntpPort host + (SockAddrInet6 _ flow host scope) -> SockAddrInet6 ntpPort flow host scope + sockAddr -> sockAddr + where + ntpPort :: PortNumber + ntpPort = 123 + +-- | Setup and run the NTP client. +-- In case of an IOError (for example when network interface goes down) cleanup and return. + +oneshotClient :: + Tracer IO NtpTrace + -> NtpClientSettings + -> IO NtpStatus +oneshotClient tracer ntpSettings = do + traceWith tracer NtpTraceClientStartQuery + (v4Servers, v6Servers) <- lookupServers $ ntpServers ntpSettings + (v4LocalAddr, v6LocalAddr) <- udpLocalAddresses >>= firstAddr "localhost" +-- TODO: bug here !! +-- this is a race-condition runProtocol can throw IO erorr !! +-- NtpTracePacketSentError Network.Socket.ByteString.sendManyTo: does not exist (Network is unreachable) + (v4Replies, v6Replies) <- concurrently (runProtocol IPv4 v4LocalAddr v4Servers) + (runProtocol IPv6 v6LocalAddr v6Servers) + when (null v4Replies && null v6Replies) $ do + traceWith tracer NtpTraceIPv4IPv6BothFailed + ioError $ userError "IPv4 and IPv6 failed" + status <- case (ntpReportPolicy ntpSettings) (v4Replies ++ v6Replies) of + Nothing -> do + traceWith tracer NtpTraceUpdateStatusQueryFailed + return NtpSyncUnavailable + Just offset -> do + traceWith tracer $ NtpTraceUpdateStatusClockOffset $ getNtpOffset offset + return $ NtpDrift offset + return status + where + runProtocol :: IPVersion -> Maybe AddrInfo -> [AddrInfo] -> IO [ReceivedPacket] + runProtocol _version _localAddr [] = return [] + runProtocol _version Nothing _ = return [] + runProtocol version (Just addr) servers = do + runNtpQueries tracer ntpSettings addr servers >>= \case + Left err -> do + traceWith tracer $ NtpTraceRunProtocolError version err + return [] + Right [] -> do + traceWith tracer $ NtpTraceRunProtocolNoResult version + return [] + Right r@(_:_) -> do + traceWith tracer $ NtpTraceRunProtocolSuccess version + return r + +runNtpQueries :: + Tracer IO NtpTrace + -> NtpClientSettings + -> AddrInfo + -> [AddrInfo] + -> IO (Either IOError [ReceivedPacket]) +runNtpQueries tracer netSettings localAddr destAddrs + = tryIOError $ bracket acquire release action + where + acquire :: IO Socket + acquire = do + s <- Socket.socket (addrFamily localAddr) Datagram Socket.defaultProtocol + traceWith tracer NtpTraceSocketOpen + return s + + release :: Socket -> IO () + release s = do + Socket.close s + traceWith tracer NtpTraceSocketClosed + + action :: Socket -> IO [ReceivedPacket] + action socket = do + Socket.setSocketOption socket ReuseAddr 1 + inQueue <- atomically $ newTVar [] + _err <- withAsync (send socket >> loopForever) $ \sender -> + withAsync timeout $ \delay -> + withAsync (reader socket inQueue ) $ \revc -> + waitAnyCancel [sender, delay, revc] + atomically $ readTVar inQueue + + send :: Socket -> IO () + send sock = forM_ destAddrs $ \addr -> do + p <- mkNtpPacket + err <- tryIOError $ Socket.ByteString.sendManyTo sock + (LBS.toChunks $ encode p) (setNtpPort $ Socket.addrAddress addr) + case err of + Right _ -> traceWith tracer NtpTracePacketSent + Left e -> do + traceWith tracer $ NtpTracePacketSentError e + ioError e + threadDelay 100_000 + + loopForever = forever $ threadDelay maxBound + + timeout = do + threadDelay $ (fromIntegral $ ntpResponseTimeout netSettings) + 100_000 * length destAddrs + traceWith tracer NtpTraceClientWaitingForRepliesTimeout + + reader :: Socket -> TVar [ReceivedPacket] -> IO () + reader socket inQueue = forever $ do + (bs, _) <- Socket.ByteString.recvFrom socket ntpPacketSize + t <- getCurrentTime + case decodeOrFail $ LBS.fromStrict bs of + Left (_, _, err) -> traceWith tracer $ NtpTraceSocketReaderDecodeError err + Right (_, _, packet) -> do + -- todo : filter bad packets, i.e. late packets and spoofed packets + traceWith tracer NtpTraceReceiveLoopPacketReceived + let received = ReceivedPacket packet t (clockOffsetPure packet t) + atomically $ modifyTVar' inQueue ((:) received) + +lookupServers :: [String] -> IO ([AddrInfo], [AddrInfo]) +lookupServers names = do + dests <- forM names $ \server -> resolveHost server >>= firstAddr server + return (mapMaybe fst dests, mapMaybe snd dests) diff --git a/ntp-client/src/Network/NTP/Test.hs b/ntp-client/src/Network/NTP/Test.hs new file mode 100644 index 00000000000..0422d6ec127 --- /dev/null +++ b/ntp-client/src/Network/NTP/Test.hs @@ -0,0 +1,38 @@ +{-# LANGUAGE NumericUnderscores #-} +module Network.NTP.Test +where + +import Control.Concurrent (threadDelay) +import Control.Concurrent.STM (atomically) +import Control.Concurrent.Async +import Control.Monad (forever) +import Control.Tracer + +import Network.NTP.Client +import Network.NTP.Query + +testClient :: IO () +testClient = withNtpClient (contramapM (return . show) stdoutTracer) testSettings runApplication + where + runApplication ntpClient = race_ getLine $ forever $ do + status <- atomically $ ntpGetStatus ntpClient + traceWith stdoutTracer $ show ("main"::String, status) + threadDelay 10_000_000 + ntpTriggerUpdate ntpClient + +testOneshotClient :: IO () +testOneshotClient = forever $ do + status <- oneshotClient tracer testSettings + traceWith stdoutTracer $ show ("main"::String, status) + threadDelay 10_000_000 + where + tracer = contramapM (return . show) stdoutTracer + +testSettings :: NtpClientSettings +testSettings = NtpClientSettings + { ntpServers = ["0.de.pool.ntp.org", "0.europe.pool.ntp.org", "0.pool.ntp.org" + , "1.pool.ntp.org", "2.pool.ntp.org", "3.pool.ntp.org"] + , ntpResponseTimeout = fromInteger 1_000_000 + , ntpPollDelay = fromInteger 300_000_000 + , ntpReportPolicy = minimumOfThree + } From ccb74fd7b90a81566b9bf965115f96b53801436d Mon Sep 17 00:00:00 2001 From: MarcFontaine Date: Tue, 28 Jan 2020 14:06:41 +0100 Subject: [PATCH 04/27] remove unused Language Options --- ntp-client/src/Network/NTP/Query.hs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/ntp-client/src/Network/NTP/Query.hs b/ntp-client/src/Network/NTP/Query.hs index e2a1be8eef1..25946ecc3ac 100644 --- a/ntp-client/src/Network/NTP/Query.hs +++ b/ntp-client/src/Network/NTP/Query.hs @@ -1,9 +1,3 @@ -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE GADTs #-} -{-# LANGUAGE KindSignatures #-} -{-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE TypeApplications #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NumericUnderscores #-} From b183d50c920208bc28538d86e9863c1becb7046e Mon Sep 17 00:00:00 2001 From: MarcFontaine Date: Wed, 29 Jan 2020 12:03:29 +0100 Subject: [PATCH 05/27] remove ReceivedPacket --- ntp-client/src/Network/NTP/Query.hs | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/ntp-client/src/Network/NTP/Query.hs b/ntp-client/src/Network/NTP/Query.hs index 25946ecc3ac..98899defa78 100644 --- a/ntp-client/src/Network/NTP/Query.hs +++ b/ntp-client/src/Network/NTP/Query.hs @@ -28,8 +28,7 @@ import Network.Socket ( AddrInfo , addrSocketType) import qualified Network.Socket as Socket import qualified Network.Socket.ByteString as Socket.ByteString (recvFrom, sendManyTo) -import Network.NTP.Packet ( NtpPacket - , mkNtpPacket +import Network.NTP.Packet ( mkNtpPacket , ntpPacketSize , Microsecond , NtpOffset (..) @@ -44,7 +43,7 @@ data NtpClientSettings = NtpClientSettings -- ^ Timeout between sending NTP requests and response collection. , ntpPollDelay :: Microsecond -- ^ How long to wait between two rounds of requests. - , ntpReportPolicy :: [ReceivedPacket] -> Maybe NtpOffset + , ntpReportPolicy :: [NtpOffset] -> Maybe NtpOffset } data NtpStatus = @@ -56,17 +55,12 @@ data NtpStatus = -- `ntpResponseTimeout` or NTP was not configured. | NtpSyncUnavailable deriving (Eq, Show) -data ReceivedPacket = ReceivedPacket - { receivedPacket :: !NtpPacket - , receivedLocalTime :: !Microsecond - , receivedOffset :: !NtpOffset - } deriving (Eq, Show) - -- | Wait for at least three replies and report the minimum of the reported offsets. -minimumOfThree :: [ReceivedPacket] -> Maybe NtpOffset +minimumOfThree :: [NtpOffset] -> Maybe NtpOffset minimumOfThree l - = if length l >= 3 then Just $ minimum $ map receivedOffset l + = if length l >= 3 then Just $ minimum l else Nothing + udpLocalAddresses :: IO [AddrInfo] udpLocalAddresses = do let hints = Socket.defaultHints @@ -95,7 +89,6 @@ firstAddr name l = case (find isV4Addr l, find isV6Addr l) of isV6Addr :: AddrInfo -> Bool isV6Addr addr = addrFamily addr == AF_INET6 - setNtpPort :: SockAddr -> SockAddr setNtpPort addr = case addr of (SockAddrInet _ host) -> SockAddrInet ntpPort host @@ -133,7 +126,7 @@ oneshotClient tracer ntpSettings = do return $ NtpDrift offset return status where - runProtocol :: IPVersion -> Maybe AddrInfo -> [AddrInfo] -> IO [ReceivedPacket] + runProtocol :: IPVersion -> Maybe AddrInfo -> [AddrInfo] -> IO [NtpOffset] runProtocol _version _localAddr [] = return [] runProtocol _version Nothing _ = return [] runProtocol version (Just addr) servers = do @@ -153,7 +146,7 @@ runNtpQueries :: -> NtpClientSettings -> AddrInfo -> [AddrInfo] - -> IO (Either IOError [ReceivedPacket]) + -> IO (Either IOError [NtpOffset]) runNtpQueries tracer netSettings localAddr destAddrs = tryIOError $ bracket acquire release action where @@ -168,7 +161,7 @@ runNtpQueries tracer netSettings localAddr destAddrs Socket.close s traceWith tracer NtpTraceSocketClosed - action :: Socket -> IO [ReceivedPacket] + action :: Socket -> IO [NtpOffset] action socket = do Socket.setSocketOption socket ReuseAddr 1 inQueue <- atomically $ newTVar [] @@ -196,7 +189,7 @@ runNtpQueries tracer netSettings localAddr destAddrs threadDelay $ (fromIntegral $ ntpResponseTimeout netSettings) + 100_000 * length destAddrs traceWith tracer NtpTraceClientWaitingForRepliesTimeout - reader :: Socket -> TVar [ReceivedPacket] -> IO () + reader :: Socket -> TVar [NtpOffset] -> IO () reader socket inQueue = forever $ do (bs, _) <- Socket.ByteString.recvFrom socket ntpPacketSize t <- getCurrentTime @@ -205,8 +198,8 @@ runNtpQueries tracer netSettings localAddr destAddrs Right (_, _, packet) -> do -- todo : filter bad packets, i.e. late packets and spoofed packets traceWith tracer NtpTraceReceiveLoopPacketReceived - let received = ReceivedPacket packet t (clockOffsetPure packet t) - atomically $ modifyTVar' inQueue ((:) received) + let offset = (clockOffsetPure packet t) + atomically $ modifyTVar' inQueue ((:) offset) lookupServers :: [String] -> IO ([AddrInfo], [AddrInfo]) lookupServers names = do From f2b91babc83fe49a550f1f7056dfe9d70941231e Mon Sep 17 00:00:00 2001 From: MarcFontaine Date: Wed, 29 Jan 2020 12:57:07 +0100 Subject: [PATCH 06/27] unfold firstAddr into call-sites --- ntp-client/src/Network/NTP/Query.hs | 30 ++++++++++++++++++++++------- ntp-client/src/Network/NTP/Trace.hs | 2 ++ 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/ntp-client/src/Network/NTP/Query.hs b/ntp-client/src/Network/NTP/Query.hs index 98899defa78..f9f9abfee6e 100644 --- a/ntp-client/src/Network/NTP/Query.hs +++ b/ntp-client/src/Network/NTP/Query.hs @@ -78,6 +78,10 @@ resolveHost host = Socket.getAddrInfo (Just hints) (Just host) Nothing , addrFlags = [AI_ADDRCONFIG] -- since we use @AF_INET@ family } +firstAddr :: Family -> [AddrInfo] -> Maybe AddrInfo +firstAddr family l = find ((==) family . addrFamily ) l + +{- firstAddr :: String -> [AddrInfo] -> IO (Maybe AddrInfo, Maybe AddrInfo) firstAddr name l = case (find isV4Addr l, find isV6Addr l) of (Nothing, Nothing) -> ioError $ userError $ "lookup host failed :" ++ name @@ -88,6 +92,7 @@ firstAddr name l = case (find isV4Addr l, find isV6Addr l) of isV6Addr :: AddrInfo -> Bool isV6Addr addr = addrFamily addr == AF_INET6 +-} setNtpPort :: SockAddr -> SockAddr setNtpPort addr = case addr of @@ -107,9 +112,14 @@ oneshotClient :: -> IO NtpStatus oneshotClient tracer ntpSettings = do traceWith tracer NtpTraceClientStartQuery - (v4Servers, v6Servers) <- lookupServers $ ntpServers ntpSettings - (v4LocalAddr, v6LocalAddr) <- udpLocalAddresses >>= firstAddr "localhost" --- TODO: bug here !! + (v4Servers, v6Servers) <- lookupServers tracer $ ntpServers ntpSettings + localAddrs <- udpLocalAddresses + (v4LocalAddr, v6LocalAddr) <- case (firstAddr AF_INET localAddrs, firstAddr AF_INET6 localAddrs) of + (Nothing, Nothing) -> do + traceWith tracer NtpTraceNoLocalAddr + ioError $ userError "no local address IPv4 and IPv6" + l -> return l + -- TODO: bug here !! -- this is a race-condition runProtocol can throw IO erorr !! -- NtpTracePacketSentError Network.Socket.ByteString.sendManyTo: does not exist (Network is unreachable) (v4Replies, v6Replies) <- concurrently (runProtocol IPv4 v4LocalAddr v4Servers) @@ -201,7 +211,13 @@ runNtpQueries tracer netSettings localAddr destAddrs let offset = (clockOffsetPure packet t) atomically $ modifyTVar' inQueue ((:) offset) -lookupServers :: [String] -> IO ([AddrInfo], [AddrInfo]) -lookupServers names = do - dests <- forM names $ \server -> resolveHost server >>= firstAddr server - return (mapMaybe fst dests, mapMaybe snd dests) +lookupServers :: Tracer IO NtpTrace -> [String] -> IO ([AddrInfo], [AddrInfo]) +lookupServers tracer names = do + dests <- forM names $ \server -> do + addr <- resolveHost server + case (firstAddr AF_INET addr, firstAddr AF_INET6 addr) of + (Nothing, Nothing) -> do + traceWith tracer $ NtpTraceLookupServerFailed server + ioError $ userError $ "lookup NTP server failed " ++ server + l -> return l + return (mapMaybe fst dests, mapMaybe snd dests) diff --git a/ntp-client/src/Network/NTP/Trace.hs b/ntp-client/src/Network/NTP/Trace.hs index 751a480a99c..ff57c95cb9e 100644 --- a/ntp-client/src/Network/NTP/Trace.hs +++ b/ntp-client/src/Network/NTP/Trace.hs @@ -17,6 +17,8 @@ data NtpTrace | NtpTraceRunProtocolNoResult !IPVersion | NtpTraceRunProtocolError !IPVersion IOError | NtpTraceIPv4IPv6BothFailed + | NtpTraceNoLocalAddr + | NtpTraceLookupServerFailed String | NtpTraceUpdateStatusQueryFailed | NtpTraceUpdateStatusClockOffset Microsecond | NtpTraceSocketOpen From 5479180ba78e0e7841ce4a11b73b7aa3ec3f435c Mon Sep 17 00:00:00 2001 From: MarcFontaine Date: Wed, 29 Jan 2020 15:39:27 +0100 Subject: [PATCH 07/27] rename NtpClientSettings --- ntp-client/src/Network/NTP/Client.hs | 6 +++--- ntp-client/src/Network/NTP/Query.hs | 25 ++++++------------------- ntp-client/src/Network/NTP/Test.hs | 10 +++++----- 3 files changed, 14 insertions(+), 27 deletions(-) diff --git a/ntp-client/src/Network/NTP/Client.hs b/ntp-client/src/Network/NTP/Client.hs index 99927c0ea46..04c1931c32e 100644 --- a/ntp-client/src/Network/NTP/Client.hs +++ b/ntp-client/src/Network/NTP/Client.hs @@ -24,7 +24,7 @@ data NtpClient = NtpClient -- | Setup a NtpClient and run a application that uses that client. -- The NtpClient is terminated when the application returns. -- And also the application is terminated when the NtpClient crashes. -withNtpClient :: Tracer IO NtpTrace -> NtpClientSettings -> (NtpClient -> IO a) -> IO a +withNtpClient :: Tracer IO NtpTrace -> NtpSettings -> (NtpClient -> IO a) -> IO a withNtpClient tracer ntpSettings action = do traceWith tracer NtpTraceStartNtpClient ntpStatus <- newTVarIO NtpSyncPending @@ -51,7 +51,7 @@ threadDelayInterruptible tvar t -- TODO: maybe reset the delaytime if the oneshotClient did one sucessful query ntpClientThread :: Tracer IO NtpTrace - -> NtpClientSettings + -> NtpSettings -> TVar NtpStatus -> IO () ntpClientThread tracer ntpSettings ntpStatus = forM_ restartDelay $ \t -> do @@ -60,7 +60,7 @@ ntpClientThread tracer ntpSettings ntpStatus = forM_ restartDelay $ \t -> do traceWith tracer NtpTraceRestartingClient catchIOError (forever $ do - status <- oneshotClient tracer ntpSettings + status <- ntpQuery tracer ntpSettings atomically $ writeTVar ntpStatus status traceWith tracer NtpTraceClientSleeping threadDelayInterruptible ntpStatus $ fromIntegral $ ntpPollDelay ntpSettings diff --git a/ntp-client/src/Network/NTP/Query.hs b/ntp-client/src/Network/NTP/Query.hs index f9f9abfee6e..e55ff18e725 100644 --- a/ntp-client/src/Network/NTP/Query.hs +++ b/ntp-client/src/Network/NTP/Query.hs @@ -36,7 +36,7 @@ import Network.NTP.Packet ( mkNtpPacket , clockOffsetPure) import Network.NTP.Trace (NtpTrace (..), IPVersion (..)) -data NtpClientSettings = NtpClientSettings +data NtpSettings = NtpSettings { ntpServers :: [String] -- ^ List of servers addresses. , ntpResponseTimeout :: Microsecond @@ -81,18 +81,6 @@ resolveHost host = Socket.getAddrInfo (Just hints) (Just host) Nothing firstAddr :: Family -> [AddrInfo] -> Maybe AddrInfo firstAddr family l = find ((==) family . addrFamily ) l -{- -firstAddr :: String -> [AddrInfo] -> IO (Maybe AddrInfo, Maybe AddrInfo) -firstAddr name l = case (find isV4Addr l, find isV6Addr l) of - (Nothing, Nothing) -> ioError $ userError $ "lookup host failed :" ++ name - p -> return p - where - isV4Addr :: AddrInfo -> Bool - isV4Addr addr = addrFamily addr == AF_INET - - isV6Addr :: AddrInfo -> Bool - isV6Addr addr = addrFamily addr == AF_INET6 --} setNtpPort :: SockAddr -> SockAddr setNtpPort addr = case addr of @@ -103,14 +91,13 @@ setNtpPort addr = case addr of ntpPort :: PortNumber ntpPort = 123 --- | Setup and run the NTP client. --- In case of an IOError (for example when network interface goes down) cleanup and return. +-- | Run a single NTP query -oneshotClient :: +ntpQuery :: Tracer IO NtpTrace - -> NtpClientSettings + -> NtpSettings -> IO NtpStatus -oneshotClient tracer ntpSettings = do +ntpQuery tracer ntpSettings = do traceWith tracer NtpTraceClientStartQuery (v4Servers, v6Servers) <- lookupServers tracer $ ntpServers ntpSettings localAddrs <- udpLocalAddresses @@ -153,7 +140,7 @@ oneshotClient tracer ntpSettings = do runNtpQueries :: Tracer IO NtpTrace - -> NtpClientSettings + -> NtpSettings -> AddrInfo -> [AddrInfo] -> IO (Either IOError [NtpOffset]) diff --git a/ntp-client/src/Network/NTP/Test.hs b/ntp-client/src/Network/NTP/Test.hs index 0422d6ec127..8bbba9e4a0b 100644 --- a/ntp-client/src/Network/NTP/Test.hs +++ b/ntp-client/src/Network/NTP/Test.hs @@ -20,16 +20,16 @@ testClient = withNtpClient (contramapM (return . show) stdoutTracer) testSetting threadDelay 10_000_000 ntpTriggerUpdate ntpClient -testOneshotClient :: IO () -testOneshotClient = forever $ do - status <- oneshotClient tracer testSettings +testNtpQuery :: IO () +testNtpQuery = forever $ do + status <- ntpQuery tracer testSettings traceWith stdoutTracer $ show ("main"::String, status) threadDelay 10_000_000 where tracer = contramapM (return . show) stdoutTracer -testSettings :: NtpClientSettings -testSettings = NtpClientSettings +testSettings :: NtpSettings +testSettings = NtpSettings { ntpServers = ["0.de.pool.ntp.org", "0.europe.pool.ntp.org", "0.pool.ntp.org" , "1.pool.ntp.org", "2.pool.ntp.org", "3.pool.ntp.org"] , ntpResponseTimeout = fromInteger 1_000_000 From dfcf10e1c2abdd1b930572e604068095ef29fef7 Mon Sep 17 00:00:00 2001 From: MarcFontaine Date: Wed, 29 Jan 2020 16:19:00 +0100 Subject: [PATCH 08/27] add blocking query --- ntp-client/src/Network/NTP/Client.hs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/ntp-client/src/Network/NTP/Client.hs b/ntp-client/src/Network/NTP/Client.hs index 04c1931c32e..ca4136f2923 100644 --- a/ntp-client/src/Network/NTP/Client.hs +++ b/ntp-client/src/Network/NTP/Client.hs @@ -1,4 +1,5 @@ {-# LANGUAGE NumericUnderscores #-} +{-# LANGUAGE LambdaCase #-} module Network.NTP.Client where @@ -6,7 +7,7 @@ import Control.Concurrent (threadDelay) import Control.Concurrent.Async import Control.Concurrent.STM (STM, atomically, check) import Control.Concurrent.STM.TVar -import System.IO.Error (catchIOError) +import System.IO.Error (catchIOError, tryIOError) import Control.Monad import Control.Tracer @@ -16,8 +17,10 @@ import Network.NTP.Trace data NtpClient = NtpClient { -- | Query the current NTP status. ntpGetStatus :: STM NtpStatus - -- | Bypass all internal threadDelays and trigger a new NTP query. + -- | Bypass all internal threadDelays and trigger a new NTP query (non-blocking). , ntpTriggerUpdate :: IO () + -- | Perform a query, update and return the NtpStatus (blocking). + , ntpQueryBlocking :: IO NtpStatus , ntpThread :: Async () } @@ -34,6 +37,14 @@ withNtpClient tracer ntpSettings action = do , ntpTriggerUpdate = do traceWith tracer NtpTraceClientActNow atomically $ writeTVar ntpStatus NtpSyncPending + , ntpQueryBlocking = tryIOError (ntpQuery tracer ntpSettings) >>= \case + Right status -> do + atomically $ writeTVar ntpStatus status + return status + Left err -> do + traceWith tracer $ NtpTraceIOError err + atomically $ writeTVar ntpStatus NtpSyncUnavailable + return NtpSyncUnavailable , ntpThread = tid } link tid -- an error in the ntp-client kills the appliction ! From 58a3ffa3d4dac1406af3bfb7a0fcf74a1311da4f Mon Sep 17 00:00:00 2001 From: MarcFontaine Date: Wed, 29 Jan 2020 16:35:31 +0100 Subject: [PATCH 09/27] don't link the NTP client thread to the application thread by default --- ntp-client/src/Network/NTP/Client.hs | 5 ++--- ntp-client/src/Network/NTP/Test.hs | 12 +++++++----- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/ntp-client/src/Network/NTP/Client.hs b/ntp-client/src/Network/NTP/Client.hs index ca4136f2923..5370ecc800b 100644 --- a/ntp-client/src/Network/NTP/Client.hs +++ b/ntp-client/src/Network/NTP/Client.hs @@ -20,13 +20,13 @@ data NtpClient = NtpClient -- | Bypass all internal threadDelays and trigger a new NTP query (non-blocking). , ntpTriggerUpdate :: IO () -- | Perform a query, update and return the NtpStatus (blocking). - , ntpQueryBlocking :: IO NtpStatus + , ntpQueryBlocking :: IO NtpStatus , ntpThread :: Async () } -- | Setup a NtpClient and run a application that uses that client. -- The NtpClient is terminated when the application returns. --- And also the application is terminated when the NtpClient crashes. +-- The application should use waitCatch on ntpThread. withNtpClient :: Tracer IO NtpTrace -> NtpSettings -> (NtpClient -> IO a) -> IO a withNtpClient tracer ntpSettings action = do traceWith tracer NtpTraceStartNtpClient @@ -47,7 +47,6 @@ withNtpClient tracer ntpSettings action = do return NtpSyncUnavailable , ntpThread = tid } - link tid -- an error in the ntp-client kills the appliction ! action client threadDelayInterruptible :: TVar NtpStatus -> Int -> IO () diff --git a/ntp-client/src/Network/NTP/Test.hs b/ntp-client/src/Network/NTP/Test.hs index 8bbba9e4a0b..f63d8e50cee 100644 --- a/ntp-client/src/Network/NTP/Test.hs +++ b/ntp-client/src/Network/NTP/Test.hs @@ -14,11 +14,13 @@ import Network.NTP.Query testClient :: IO () testClient = withNtpClient (contramapM (return . show) stdoutTracer) testSettings runApplication where - runApplication ntpClient = race_ getLine $ forever $ do - status <- atomically $ ntpGetStatus ntpClient - traceWith stdoutTracer $ show ("main"::String, status) - threadDelay 10_000_000 - ntpTriggerUpdate ntpClient + runApplication ntpClient = do + link $ ntpThread ntpClient -- propergate any errors in the NTP thread. + race_ getLine $ forever $ do + status <- atomically $ ntpGetStatus ntpClient + traceWith stdoutTracer $ show ("main"::String, status) + threadDelay 10_000_000 + ntpTriggerUpdate ntpClient testNtpQuery :: IO () testNtpQuery = forever $ do From b62f411647a9cfe8ca36c67b4f97fef34e3cd196 Mon Sep 17 00:00:00 2001 From: MarcFontaine Date: Wed, 29 Jan 2020 16:55:31 +0100 Subject: [PATCH 10/27] address some reviews --- ntp-client/ntp-client.cabal | 6 ++---- ntp-client/src/Network/NTP/Client.hs | 8 ++++---- ntp-client/src/Network/NTP/Packet.hs | 1 - ntp-client/src/Network/NTP/Query.hs | 29 ++++++++++++++-------------- 4 files changed, 20 insertions(+), 24 deletions(-) diff --git a/ntp-client/ntp-client.cabal b/ntp-client/ntp-client.cabal index 2cbfa8daff2..b38a1f8b173 100644 --- a/ntp-client/ntp-client.cabal +++ b/ntp-client/ntp-client.cabal @@ -26,10 +26,8 @@ Library hs-source-dirs: src default-language: Haskell2010 - ghc-options: -Wall - default-extensions: DeriveDataTypeable - DeriveGeneric - GeneralizedNewtypeDeriving + ghc-options: -Wall -Werror + default-extensions: GeneralizedNewtypeDeriving test-suite ntp-client-test hs-source-dirs: test, src diff --git a/ntp-client/src/Network/NTP/Client.hs b/ntp-client/src/Network/NTP/Client.hs index 5370ecc800b..2bdc78b8022 100644 --- a/ntp-client/src/Network/NTP/Client.hs +++ b/ntp-client/src/Network/NTP/Client.hs @@ -49,8 +49,8 @@ withNtpClient tracer ntpSettings action = do } action client -threadDelayInterruptible :: TVar NtpStatus -> Int -> IO () -threadDelayInterruptible tvar t +awaitPendingWithTimeout :: TVar NtpStatus -> Int -> IO () +awaitPendingWithTimeout tvar t = race_ ( threadDelay t ) ( atomically $ do @@ -66,14 +66,14 @@ ntpClientThread :: -> IO () ntpClientThread tracer ntpSettings ntpStatus = forM_ restartDelay $ \t -> do traceWith tracer $ NtpTraceRestartDelay t - threadDelayInterruptible ntpStatus $ t * 1_000_000 + awaitPendingWithTimeout ntpStatus $ t * 1_000_000 traceWith tracer NtpTraceRestartingClient catchIOError (forever $ do status <- ntpQuery tracer ntpSettings atomically $ writeTVar ntpStatus status traceWith tracer NtpTraceClientSleeping - threadDelayInterruptible ntpStatus $ fromIntegral $ ntpPollDelay ntpSettings + awaitPendingWithTimeout ntpStatus $ fromIntegral $ ntpPollDelay ntpSettings ) (\err -> traceWith tracer $ NtpTraceIOError err) atomically $ writeTVar ntpStatus NtpSyncUnavailable diff --git a/ntp-client/src/Network/NTP/Packet.hs b/ntp-client/src/Network/NTP/Packet.hs index d0ea8a78642..4c0d06fb66b 100644 --- a/ntp-client/src/Network/NTP/Packet.hs +++ b/ntp-client/src/Network/NTP/Packet.hs @@ -1,6 +1,5 @@ {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE RecordWildCards #-} -{-# LANGUAGE ViewPatterns #-} module Network.NTP.Packet ( NtpPacket (..) diff --git a/ntp-client/src/Network/NTP/Query.hs b/ntp-client/src/Network/NTP/Query.hs index e55ff18e725..536c49e1437 100644 --- a/ntp-client/src/Network/NTP/Query.hs +++ b/ntp-client/src/Network/NTP/Query.hs @@ -62,13 +62,14 @@ minimumOfThree l else Nothing udpLocalAddresses :: IO [AddrInfo] -udpLocalAddresses = do - let hints = Socket.defaultHints - { addrFlags = [AI_PASSIVE] - , addrSocketType = Datagram } - port = Socket.defaultPort - -- Hints Host Service - Socket.getAddrInfo (Just hints) Nothing (Just $ show port) +-- Hints Host Service +udpLocalAddresses = Socket.getAddrInfo (Just hints) Nothing (Just $ show port) + where + hints = Socket.defaultHints + { addrFlags = [AI_PASSIVE] + , addrSocketType = Datagram + } + port = Socket.defaultPort resolveHost :: String -> IO [AddrInfo] resolveHost host = Socket.getAddrInfo (Just hints) (Just host) Nothing @@ -81,18 +82,16 @@ resolveHost host = Socket.getAddrInfo (Just hints) (Just host) Nothing firstAddr :: Family -> [AddrInfo] -> Maybe AddrInfo firstAddr family l = find ((==) family . addrFamily ) l - setNtpPort :: SockAddr -> SockAddr setNtpPort addr = case addr of (SockAddrInet _ host) -> SockAddrInet ntpPort host (SockAddrInet6 _ flow host scope) -> SockAddrInet6 ntpPort flow host scope - sockAddr -> sockAddr - where - ntpPort :: PortNumber - ntpPort = 123 - --- | Run a single NTP query + sockAddr -> sockAddr + where + ntpPort :: PortNumber + ntpPort = 123 +-- | Perform a single NTP query and return the result. ntpQuery :: Tracer IO NtpTrace -> NtpSettings @@ -192,8 +191,8 @@ runNtpQueries tracer netSettings localAddr destAddrs t <- getCurrentTime case decodeOrFail $ LBS.fromStrict bs of Left (_, _, err) -> traceWith tracer $ NtpTraceSocketReaderDecodeError err + -- TODO : filter bad packets, i.e. late packets and spoofed packets Right (_, _, packet) -> do - -- todo : filter bad packets, i.e. late packets and spoofed packets traceWith tracer NtpTraceReceiveLoopPacketReceived let offset = (clockOffsetPure packet t) atomically $ modifyTVar' inQueue ((:) offset) From a5f0237e4df05c3fcbcba9bb4745735cb8e4d1cd Mon Sep 17 00:00:00 2001 From: MarcFontaine Date: Wed, 29 Jan 2020 18:32:24 +0100 Subject: [PATCH 11/27] use backround client thread for ntpQueryBlocking --- ntp-client/src/Network/NTP/Client.hs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/ntp-client/src/Network/NTP/Client.hs b/ntp-client/src/Network/NTP/Client.hs index 2bdc78b8022..a03968d41ea 100644 --- a/ntp-client/src/Network/NTP/Client.hs +++ b/ntp-client/src/Network/NTP/Client.hs @@ -7,7 +7,7 @@ import Control.Concurrent (threadDelay) import Control.Concurrent.Async import Control.Concurrent.STM (STM, atomically, check) import Control.Concurrent.STM.TVar -import System.IO.Error (catchIOError, tryIOError) +import System.IO.Error (catchIOError) import Control.Monad import Control.Tracer @@ -35,16 +35,15 @@ withNtpClient tracer ntpSettings action = do let client = NtpClient { ntpGetStatus = readTVar ntpStatus , ntpTriggerUpdate = do - traceWith tracer NtpTraceClientActNow - atomically $ writeTVar ntpStatus NtpSyncPending - , ntpQueryBlocking = tryIOError (ntpQuery tracer ntpSettings) >>= \case - Right status -> do - atomically $ writeTVar ntpStatus status - return status - Left err -> do - traceWith tracer $ NtpTraceIOError err - atomically $ writeTVar ntpStatus NtpSyncUnavailable - return NtpSyncUnavailable + traceWith tracer NtpTraceClientActNow + atomically $ writeTVar ntpStatus NtpSyncPending + , ntpQueryBlocking = do + atomically $ writeTVar ntpStatus NtpSyncPending + status <- atomically $ do + s <- readTVar ntpStatus + check $ s /= NtpSyncPending + return s + return status , ntpThread = tid } action client From 0ee4f788c4a9f6d31e36f0ed21f41a670568cc09 Mon Sep 17 00:00:00 2001 From: MarcFontaine Date: Thu, 30 Jan 2020 12:51:20 +0100 Subject: [PATCH 12/27] cleanup NtpTrace type --- ntp-client/src/Network/NTP/Client.hs | 11 +++-- ntp-client/src/Network/NTP/Query.hs | 70 +++++++++++++++------------- ntp-client/src/Network/NTP/Trace.hs | 49 +++++++------------ 3 files changed, 63 insertions(+), 67 deletions(-) diff --git a/ntp-client/src/Network/NTP/Client.hs b/ntp-client/src/Network/NTP/Client.hs index a03968d41ea..fdf91e28aaf 100644 --- a/ntp-client/src/Network/NTP/Client.hs +++ b/ntp-client/src/Network/NTP/Client.hs @@ -1,7 +1,9 @@ {-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE LambdaCase #-} -module Network.NTP.Client -where +module Network.NTP.Client ( + NtpClient(..) + , withNtpClient + ) where import Control.Concurrent (threadDelay) import Control.Concurrent.Async @@ -35,9 +37,10 @@ withNtpClient tracer ntpSettings action = do let client = NtpClient { ntpGetStatus = readTVar ntpStatus , ntpTriggerUpdate = do - traceWith tracer NtpTraceClientActNow + traceWith tracer NtpTraceTriggerUpdate atomically $ writeTVar ntpStatus NtpSyncPending , ntpQueryBlocking = do + traceWith tracer NtpTraceTriggerUpdate atomically $ writeTVar ntpStatus NtpSyncPending status <- atomically $ do s <- readTVar ntpStatus @@ -57,7 +60,7 @@ awaitPendingWithTimeout tvar t check $ s == NtpSyncPending ) --- TODO: maybe reset the delaytime if the oneshotClient did one sucessful query +-- TODO: maybe reset the delaytime if ntpQuery did one sucessful query ntpClientThread :: Tracer IO NtpTrace -> NtpSettings diff --git a/ntp-client/src/Network/NTP/Query.hs b/ntp-client/src/Network/NTP/Query.hs index 536c49e1437..ff183200863 100644 --- a/ntp-client/src/Network/NTP/Query.hs +++ b/ntp-client/src/Network/NTP/Query.hs @@ -1,15 +1,19 @@ {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NumericUnderscores #-} -module Network.NTP.Query -where +module Network.NTP.Query ( + NtpSettings(..) + , NtpStatus(..) + , minimumOfThree + , ntpQuery + ) where import Control.Concurrent (threadDelay) import Control.Concurrent.Async import Control.Concurrent.STM import Control.Exception (bracket) import System.IO.Error (tryIOError, userError, ioError) -import Control.Monad (forever, forM, forM_, when) +import Control.Monad (forever, forM, forM_, replicateM_, when) import Control.Tracer import Data.Binary (decodeOrFail, encode) import qualified Data.ByteString.Lazy as LBS @@ -91,7 +95,19 @@ setNtpPort addr = case addr of ntpPort :: PortNumber ntpPort = 123 +lookupServers :: Tracer IO NtpTrace -> [String] -> IO ([AddrInfo], [AddrInfo]) +lookupServers tracer names = do + dests <- forM names $ \server -> do + addr <- resolveHost server + case (firstAddr AF_INET addr, firstAddr AF_INET6 addr) of + (Nothing, Nothing) -> do + traceWith tracer $ NtpTraceLookupServerFailed server + ioError $ userError $ "lookup NTP server failed " ++ server + l -> return l + return (mapMaybe fst dests, mapMaybe snd dests) + -- | Perform a single NTP query and return the result. +-- This function my throw an IO exception. ntpQuery :: Tracer IO NtpTrace -> NtpSettings @@ -115,47 +131,48 @@ ntpQuery tracer ntpSettings = do ioError $ userError "IPv4 and IPv6 failed" status <- case (ntpReportPolicy ntpSettings) (v4Replies ++ v6Replies) of Nothing -> do - traceWith tracer NtpTraceUpdateStatusQueryFailed + traceWith tracer NtpTraceReportPolicyQueryFailed return NtpSyncUnavailable Just offset -> do - traceWith tracer $ NtpTraceUpdateStatusClockOffset $ getNtpOffset offset + traceWith tracer $ NtpTraceQueryResult $ getNtpOffset offset return $ NtpDrift offset return status where runProtocol :: IPVersion -> Maybe AddrInfo -> [AddrInfo] -> IO [NtpOffset] - runProtocol _version _localAddr [] = return [] - runProtocol _version Nothing _ = return [] - runProtocol version (Just addr) servers = do - runNtpQueries tracer ntpSettings addr servers >>= \case + runProtocol _proto _localAddr [] = return [] + runProtocol _proto Nothing _ = return [] + runProtocol protocol (Just addr) servers = do + runNtpQueries tracer protocol ntpSettings addr servers >>= \case Left err -> do - traceWith tracer $ NtpTraceRunProtocolError version err + traceWith tracer $ NtpTraceRunProtocolError protocol err return [] Right [] -> do - traceWith tracer $ NtpTraceRunProtocolNoResult version + traceWith tracer $ NtpTraceRunProtocolNoResult protocol return [] Right r@(_:_) -> do - traceWith tracer $ NtpTraceRunProtocolSuccess version + traceWith tracer $ NtpTraceRunProtocolSuccess protocol return r runNtpQueries :: Tracer IO NtpTrace + -> IPVersion -> NtpSettings -> AddrInfo -> [AddrInfo] -> IO (Either IOError [NtpOffset]) -runNtpQueries tracer netSettings localAddr destAddrs +runNtpQueries tracer protocol netSettings localAddr destAddrs = tryIOError $ bracket acquire release action where acquire :: IO Socket acquire = do s <- Socket.socket (addrFamily localAddr) Datagram Socket.defaultProtocol - traceWith tracer NtpTraceSocketOpen + traceWith tracer $ NtpTraceSocketOpen protocol return s release :: Socket -> IO () release s = do Socket.close s - traceWith tracer NtpTraceSocketClosed + traceWith tracer $ NtpTraceSocketClosed protocol action :: Socket -> IO [NtpOffset] action socket = do @@ -173,9 +190,9 @@ runNtpQueries tracer netSettings localAddr destAddrs err <- tryIOError $ Socket.ByteString.sendManyTo sock (LBS.toChunks $ encode p) (setNtpPort $ Socket.addrAddress addr) case err of - Right _ -> traceWith tracer NtpTracePacketSent + Right _ -> traceWith tracer $ NtpTracePacketSent protocol Left e -> do - traceWith tracer $ NtpTracePacketSentError e + traceWith tracer $ NtpTracePacketSentError protocol e ioError e threadDelay 100_000 @@ -183,27 +200,16 @@ runNtpQueries tracer netSettings localAddr destAddrs timeout = do threadDelay $ (fromIntegral $ ntpResponseTimeout netSettings) + 100_000 * length destAddrs - traceWith tracer NtpTraceClientWaitingForRepliesTimeout + traceWith tracer $ NtpTraceWaitingForRepliesTimeout protocol reader :: Socket -> TVar [NtpOffset] -> IO () - reader socket inQueue = forever $ do + reader socket inQueue = replicateM_ (length destAddrs) $ do (bs, _) <- Socket.ByteString.recvFrom socket ntpPacketSize t <- getCurrentTime case decodeOrFail $ LBS.fromStrict bs of - Left (_, _, err) -> traceWith tracer $ NtpTraceSocketReaderDecodeError err + Left (_, _, err) -> traceWith tracer $ NtpTracePacketDecodeError protocol err -- TODO : filter bad packets, i.e. late packets and spoofed packets Right (_, _, packet) -> do - traceWith tracer NtpTraceReceiveLoopPacketReceived + traceWith tracer $ NtpTracePacketReceived protocol let offset = (clockOffsetPure packet t) atomically $ modifyTVar' inQueue ((:) offset) - -lookupServers :: Tracer IO NtpTrace -> [String] -> IO ([AddrInfo], [AddrInfo]) -lookupServers tracer names = do - dests <- forM names $ \server -> do - addr <- resolveHost server - case (firstAddr AF_INET addr, firstAddr AF_INET6 addr) of - (Nothing, Nothing) -> do - traceWith tracer $ NtpTraceLookupServerFailed server - ioError $ userError $ "lookup NTP server failed " ++ server - l -> return l - return (mapMaybe fst dests, mapMaybe snd dests) diff --git a/ntp-client/src/Network/NTP/Trace.hs b/ntp-client/src/Network/NTP/Trace.hs index ff57c95cb9e..c4adb32b7d7 100644 --- a/ntp-client/src/Network/NTP/Trace.hs +++ b/ntp-client/src/Network/NTP/Trace.hs @@ -1,44 +1,31 @@ module Network.NTP.Trace where -import Control.Exception (IOException) import Network.NTP.Packet (Microsecond) data IPVersion = IPv4 | IPv6 deriving (Show) data NtpTrace - = NtpTraceIOError IOError - | NtpTraceStartNtpClient - | NtpTraceClientActNow + = NtpTraceStartNtpClient + | NtpTraceTriggerUpdate | NtpTraceRestartDelay Int | NtpTraceRestartingClient + | NtpTraceClientSleeping + | NtpTraceIOError !IOError + | NtpTraceLookupServerFailed !String | NtpTraceClientStartQuery - | NtpTraceRunProtocolSuccess !IPVersion - | NtpTraceRunProtocolNoResult !IPVersion - | NtpTraceRunProtocolError !IPVersion IOError - | NtpTraceIPv4IPv6BothFailed | NtpTraceNoLocalAddr - | NtpTraceLookupServerFailed String - | NtpTraceUpdateStatusQueryFailed - | NtpTraceUpdateStatusClockOffset Microsecond - | NtpTraceSocketOpen - | NtpTraceSocketClosed - | NtpTracePacketSent - | NtpTracePacketSentError IOException - | NtpTraceClientWaitingForRepliesTimeout - | NtpTraceReceiveLoopPacketReceived - | NtpTraceClientSleeping - - - - - - - | NtpTraceSocketReaderDecodeError String - | NtpTraceSocketReaderIOException IOException - | NtpTraceQueryLoopIOException IOException - | NtpTraceOneshotClientIOError IOException - - | NtpTraceSocketCreated String String - + | NtpTraceIPv4IPv6BothFailed + | NtpTraceReportPolicyQueryFailed + | NtpTraceQueryResult !Microsecond + | NtpTraceRunProtocolError !IPVersion !IOError + | NtpTraceRunProtocolNoResult !IPVersion + | NtpTraceRunProtocolSuccess !IPVersion + | NtpTraceSocketOpen !IPVersion + | NtpTraceSocketClosed !IPVersion + | NtpTracePacketSent !IPVersion + | NtpTracePacketSentError !IPVersion !IOError + | NtpTracePacketDecodeError !IPVersion !String + | NtpTracePacketReceived !IPVersion + | NtpTraceWaitingForRepliesTimeout !IPVersion deriving (Show) From 02a6ce83c3cedcac202779e1cfb0ae383f623d1e Mon Sep 17 00:00:00 2001 From: MarcFontaine Date: Thu, 30 Jan 2020 17:15:48 +0100 Subject: [PATCH 13/27] no special case for no replies --- ntp-client/src/Network/NTP/Client.hs | 17 +++++++++-------- ntp-client/src/Network/NTP/Query.hs | 23 ++++++++++------------- ntp-client/src/Network/NTP/Trace.hs | 2 +- 3 files changed, 20 insertions(+), 22 deletions(-) diff --git a/ntp-client/src/Network/NTP/Client.hs b/ntp-client/src/Network/NTP/Client.hs index fdf91e28aaf..11507d5fe07 100644 --- a/ntp-client/src/Network/NTP/Client.hs +++ b/ntp-client/src/Network/NTP/Client.hs @@ -70,15 +70,16 @@ ntpClientThread tracer ntpSettings ntpStatus = forM_ restartDelay $ \t -> do traceWith tracer $ NtpTraceRestartDelay t awaitPendingWithTimeout ntpStatus $ t * 1_000_000 traceWith tracer NtpTraceRestartingClient - catchIOError - (forever $ do - status <- ntpQuery tracer ntpSettings - atomically $ writeTVar ntpStatus status - traceWith tracer NtpTraceClientSleeping - awaitPendingWithTimeout ntpStatus $ fromIntegral $ ntpPollDelay ntpSettings - ) - (\err -> traceWith tracer $ NtpTraceIOError err) + catchIOError queryLoop (\err -> traceWith tracer $ NtpTraceIOError err) atomically $ writeTVar ntpStatus NtpSyncUnavailable where restartDelay :: [Int] restartDelay = [0, 5, 10, 20, 60, 180, 600] ++ repeat 600 + + queryLoop = ntpQuery tracer ntpSettings >>= \case + status@(NtpDrift _ ) -> do + atomically $ writeTVar ntpStatus status + traceWith tracer NtpTraceClientSleeping + awaitPendingWithTimeout ntpStatus $ fromIntegral $ ntpPollDelay ntpSettings + queryLoop + _ -> return () diff --git a/ntp-client/src/Network/NTP/Query.hs b/ntp-client/src/Network/NTP/Query.hs index ff183200863..6f102a306cb 100644 --- a/ntp-client/src/Network/NTP/Query.hs +++ b/ntp-client/src/Network/NTP/Query.hs @@ -121,22 +121,19 @@ ntpQuery tracer ntpSettings = do traceWith tracer NtpTraceNoLocalAddr ioError $ userError "no local address IPv4 and IPv6" l -> return l - -- TODO: bug here !! --- this is a race-condition runProtocol can throw IO erorr !! --- NtpTracePacketSentError Network.Socket.ByteString.sendManyTo: does not exist (Network is unreachable) (v4Replies, v6Replies) <- concurrently (runProtocol IPv4 v4LocalAddr v4Servers) (runProtocol IPv6 v6LocalAddr v6Servers) - when (null v4Replies && null v6Replies) $ do - traceWith tracer NtpTraceIPv4IPv6BothFailed - ioError $ userError "IPv4 and IPv6 failed" - status <- case (ntpReportPolicy ntpSettings) (v4Replies ++ v6Replies) of - Nothing -> do - traceWith tracer NtpTraceReportPolicyQueryFailed + case v4Replies ++ v6Replies of + [] -> do + traceWith tracer NtpTraceIPv4IPv6NoReplies return NtpSyncUnavailable - Just offset -> do - traceWith tracer $ NtpTraceQueryResult $ getNtpOffset offset - return $ NtpDrift offset - return status + l -> case ntpReportPolicy ntpSettings $ l of + Nothing -> do + traceWith tracer NtpTraceReportPolicyQueryFailed + return NtpSyncUnavailable + Just offset -> do + traceWith tracer $ NtpTraceQueryResult $ getNtpOffset offset + return $ NtpDrift offset where runProtocol :: IPVersion -> Maybe AddrInfo -> [AddrInfo] -> IO [NtpOffset] runProtocol _proto _localAddr [] = return [] diff --git a/ntp-client/src/Network/NTP/Trace.hs b/ntp-client/src/Network/NTP/Trace.hs index c4adb32b7d7..860c049ffd0 100644 --- a/ntp-client/src/Network/NTP/Trace.hs +++ b/ntp-client/src/Network/NTP/Trace.hs @@ -15,7 +15,7 @@ data NtpTrace | NtpTraceLookupServerFailed !String | NtpTraceClientStartQuery | NtpTraceNoLocalAddr - | NtpTraceIPv4IPv6BothFailed + | NtpTraceIPv4IPv6NoReplies | NtpTraceReportPolicyQueryFailed | NtpTraceQueryResult !Microsecond | NtpTraceRunProtocolError !IPVersion !IOError From 9535da35af97ae44d69521baada74c0d870e3141 Mon Sep 17 00:00:00 2001 From: MarcFontaine Date: Thu, 30 Jan 2020 17:31:50 +0100 Subject: [PATCH 14/27] fix spelling and import --- ntp-client/src/Network/NTP/Client.hs | 4 ++-- ntp-client/src/Network/NTP/Query.hs | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/ntp-client/src/Network/NTP/Client.hs b/ntp-client/src/Network/NTP/Client.hs index 11507d5fe07..70f6a1f21d2 100644 --- a/ntp-client/src/Network/NTP/Client.hs +++ b/ntp-client/src/Network/NTP/Client.hs @@ -19,7 +19,7 @@ import Network.NTP.Trace data NtpClient = NtpClient { -- | Query the current NTP status. ntpGetStatus :: STM NtpStatus - -- | Bypass all internal threadDelays and trigger a new NTP query (non-blocking). + -- | Bypass all internal thread Delays and trigger a new NTP query (non-blocking). , ntpTriggerUpdate :: IO () -- | Perform a query, update and return the NtpStatus (blocking). , ntpQueryBlocking :: IO NtpStatus @@ -60,7 +60,7 @@ awaitPendingWithTimeout tvar t check $ s == NtpSyncPending ) --- TODO: maybe reset the delaytime if ntpQuery did one sucessful query +-- TODO: Reset the delay time if ntpQuery did one successful query. ntpClientThread :: Tracer IO NtpTrace -> NtpSettings diff --git a/ntp-client/src/Network/NTP/Query.hs b/ntp-client/src/Network/NTP/Query.hs index 6f102a306cb..dd69f3d3491 100644 --- a/ntp-client/src/Network/NTP/Query.hs +++ b/ntp-client/src/Network/NTP/Query.hs @@ -1,6 +1,5 @@ {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NumericUnderscores #-} - module Network.NTP.Query ( NtpSettings(..) , NtpStatus(..) @@ -13,7 +12,7 @@ import Control.Concurrent.Async import Control.Concurrent.STM import Control.Exception (bracket) import System.IO.Error (tryIOError, userError, ioError) -import Control.Monad (forever, forM, forM_, replicateM_, when) +import Control.Monad (forever, forM, forM_, replicateM_) import Control.Tracer import Data.Binary (decodeOrFail, encode) import qualified Data.ByteString.Lazy as LBS From 5fac09786da39c4fe9ea6061bb6da0c9220c7ef5 Mon Sep 17 00:00:00 2001 From: MarcFontaine Date: Mon, 3 Feb 2020 19:53:17 +0100 Subject: [PATCH 15/27] address some reviews --- nix/pkgs.nix | 1 + ntp-client/ntp-client.cabal | 2 +- ntp-client/src/Network/NTP/Client.hs | 5 ----- ntp-client/src/Network/NTP/Query.hs | 19 ++++++++----------- ntp-client/src/Network/NTP/Test.hs | 5 +---- ntp-client/src/Network/NTP/Trace.hs | 2 +- 6 files changed, 12 insertions(+), 22 deletions(-) diff --git a/nix/pkgs.nix b/nix/pkgs.nix index 95692cbd0b1..d59c0c2b191 100644 --- a/nix/pkgs.nix +++ b/nix/pkgs.nix @@ -38,6 +38,7 @@ let packages.io-sim-classes.configureFlags = [ "--ghc-option=-Werror" ]; packages.Win32-network.configureFlags = [ "--ghc-option=-Werror" ]; packages.network-mux.configureFlags = [ "--ghc-option=-Werror" ]; + packages.ntp-client.configureFlags = [ "--ghc-option=-Werror" ]; packages.ouroboros-network.configureFlags = [ "--ghc-option=-Werror" ]; packages.ouroboros-network.flags.cddl = true; packages.ouroboros-network.components.tests.cddl.build-tools = [pkgs.cddl pkgs.cbor-diag]; diff --git a/ntp-client/ntp-client.cabal b/ntp-client/ntp-client.cabal index b38a1f8b173..782ef5d4052 100644 --- a/ntp-client/ntp-client.cabal +++ b/ntp-client/ntp-client.cabal @@ -26,7 +26,7 @@ Library hs-source-dirs: src default-language: Haskell2010 - ghc-options: -Wall -Werror + ghc-options: -Wall default-extensions: GeneralizedNewtypeDeriving test-suite ntp-client-test diff --git a/ntp-client/src/Network/NTP/Client.hs b/ntp-client/src/Network/NTP/Client.hs index 70f6a1f21d2..883a991b3bb 100644 --- a/ntp-client/src/Network/NTP/Client.hs +++ b/ntp-client/src/Network/NTP/Client.hs @@ -20,8 +20,6 @@ data NtpClient = NtpClient { -- | Query the current NTP status. ntpGetStatus :: STM NtpStatus -- | Bypass all internal thread Delays and trigger a new NTP query (non-blocking). - , ntpTriggerUpdate :: IO () - -- | Perform a query, update and return the NtpStatus (blocking). , ntpQueryBlocking :: IO NtpStatus , ntpThread :: Async () } @@ -36,9 +34,6 @@ withNtpClient tracer ntpSettings action = do withAsync (ntpClientThread tracer ntpSettings ntpStatus) $ \tid -> do let client = NtpClient { ntpGetStatus = readTVar ntpStatus - , ntpTriggerUpdate = do - traceWith tracer NtpTraceTriggerUpdate - atomically $ writeTVar ntpStatus NtpSyncPending , ntpQueryBlocking = do traceWith tracer NtpTraceTriggerUpdate atomically $ writeTVar ntpStatus NtpSyncPending diff --git a/ntp-client/src/Network/NTP/Query.hs b/ntp-client/src/Network/NTP/Query.hs index dd69f3d3491..f7b4a60941d 100644 --- a/ntp-client/src/Network/NTP/Query.hs +++ b/ntp-client/src/Network/NTP/Query.hs @@ -41,17 +41,16 @@ import Network.NTP.Trace (NtpTrace (..), IPVersion (..)) data NtpSettings = NtpSettings { ntpServers :: [String] - -- ^ List of servers addresses. + -- ^ List of server addresses. At least three servers are needed. , ntpResponseTimeout :: Microsecond -- ^ Timeout between sending NTP requests and response collection. , ntpPollDelay :: Microsecond -- ^ How long to wait between two rounds of requests. - , ntpReportPolicy :: [NtpOffset] -> Maybe NtpOffset } data NtpStatus = -- | The difference between NTP time and local system time - NtpDrift NtpOffset + NtpDrift !NtpOffset -- | NTP client has send requests to the servers | NtpSyncPending -- | NTP is not available: the client has not received any respond within @@ -126,7 +125,7 @@ ntpQuery tracer ntpSettings = do [] -> do traceWith tracer NtpTraceIPv4IPv6NoReplies return NtpSyncUnavailable - l -> case ntpReportPolicy ntpSettings $ l of + l -> case minimumOfThree l of Nothing -> do traceWith tracer NtpTraceReportPolicyQueryFailed return NtpSyncUnavailable @@ -160,10 +159,7 @@ runNtpQueries tracer protocol netSettings localAddr destAddrs = tryIOError $ bracket acquire release action where acquire :: IO Socket - acquire = do - s <- Socket.socket (addrFamily localAddr) Datagram Socket.defaultProtocol - traceWith tracer $ NtpTraceSocketOpen protocol - return s + acquire = Socket.socket (addrFamily localAddr) Datagram Socket.defaultProtocol release :: Socket -> IO () release s = do @@ -172,11 +168,12 @@ runNtpQueries tracer protocol netSettings localAddr destAddrs action :: Socket -> IO [NtpOffset] action socket = do + traceWith tracer $ NtpTraceSocketOpen protocol Socket.setSocketOption socket ReuseAddr 1 inQueue <- atomically $ newTVar [] _err <- withAsync (send socket >> loopForever) $ \sender -> withAsync timeout $ \delay -> - withAsync (reader socket inQueue ) $ \revc -> + withAsync (receiver socket inQueue ) $ \revc -> waitAnyCancel [sender, delay, revc] atomically $ readTVar inQueue @@ -198,8 +195,8 @@ runNtpQueries tracer protocol netSettings localAddr destAddrs threadDelay $ (fromIntegral $ ntpResponseTimeout netSettings) + 100_000 * length destAddrs traceWith tracer $ NtpTraceWaitingForRepliesTimeout protocol - reader :: Socket -> TVar [NtpOffset] -> IO () - reader socket inQueue = replicateM_ (length destAddrs) $ do + receiver :: Socket -> TVar [NtpOffset] -> IO () + receiver socket inQueue = replicateM_ (length destAddrs) $ do (bs, _) <- Socket.ByteString.recvFrom socket ntpPacketSize t <- getCurrentTime case decodeOrFail $ LBS.fromStrict bs of diff --git a/ntp-client/src/Network/NTP/Test.hs b/ntp-client/src/Network/NTP/Test.hs index f63d8e50cee..9f4e1229137 100644 --- a/ntp-client/src/Network/NTP/Test.hs +++ b/ntp-client/src/Network/NTP/Test.hs @@ -3,7 +3,6 @@ module Network.NTP.Test where import Control.Concurrent (threadDelay) -import Control.Concurrent.STM (atomically) import Control.Concurrent.Async import Control.Monad (forever) import Control.Tracer @@ -17,10 +16,9 @@ testClient = withNtpClient (contramapM (return . show) stdoutTracer) testSetting runApplication ntpClient = do link $ ntpThread ntpClient -- propergate any errors in the NTP thread. race_ getLine $ forever $ do - status <- atomically $ ntpGetStatus ntpClient + status <- ntpQueryBlocking ntpClient traceWith stdoutTracer $ show ("main"::String, status) threadDelay 10_000_000 - ntpTriggerUpdate ntpClient testNtpQuery :: IO () testNtpQuery = forever $ do @@ -36,5 +34,4 @@ testSettings = NtpSettings , "1.pool.ntp.org", "2.pool.ntp.org", "3.pool.ntp.org"] , ntpResponseTimeout = fromInteger 1_000_000 , ntpPollDelay = fromInteger 300_000_000 - , ntpReportPolicy = minimumOfThree } diff --git a/ntp-client/src/Network/NTP/Trace.hs b/ntp-client/src/Network/NTP/Trace.hs index 860c049ffd0..051c20fb948 100644 --- a/ntp-client/src/Network/NTP/Trace.hs +++ b/ntp-client/src/Network/NTP/Trace.hs @@ -8,7 +8,7 @@ data IPVersion = IPv4 | IPv6 data NtpTrace = NtpTraceStartNtpClient | NtpTraceTriggerUpdate - | NtpTraceRestartDelay Int + | NtpTraceRestartDelay !Int | NtpTraceRestartingClient | NtpTraceClientSleeping | NtpTraceIOError !IOError From f94e50313a45694040fb21d02bf73acada293a5d Mon Sep 17 00:00:00 2001 From: MarcFontaine Date: Tue, 4 Feb 2020 19:44:06 +0100 Subject: [PATCH 16/27] improve comments --- ntp-client/src/Network/NTP/Client.hs | 2 +- ntp-client/src/Network/NTP/Query.hs | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/ntp-client/src/Network/NTP/Client.hs b/ntp-client/src/Network/NTP/Client.hs index 883a991b3bb..85f5e24ba8c 100644 --- a/ntp-client/src/Network/NTP/Client.hs +++ b/ntp-client/src/Network/NTP/Client.hs @@ -60,7 +60,7 @@ ntpClientThread :: Tracer IO NtpTrace -> NtpSettings -> TVar NtpStatus - -> IO () + -> IO Void ntpClientThread tracer ntpSettings ntpStatus = forM_ restartDelay $ \t -> do traceWith tracer $ NtpTraceRestartDelay t awaitPendingWithTimeout ntpStatus $ t * 1_000_000 diff --git a/ntp-client/src/Network/NTP/Query.hs b/ntp-client/src/Network/NTP/Query.hs index f7b4a60941d..938eb9381c4 100644 --- a/ntp-client/src/Network/NTP/Query.hs +++ b/ntp-client/src/Network/NTP/Query.hs @@ -76,9 +76,13 @@ udpLocalAddresses = Socket.getAddrInfo (Just hints) Nothing (Just $ show port) resolveHost :: String -> IO [AddrInfo] resolveHost host = Socket.getAddrInfo (Just hints) (Just host) Nothing where + -- The library uses AI_ADDRCONFIG as simple test if IPv4 or IPv6 are configured. + -- According to the documentation, AI_ADDRCONFIG is not available on all platforms, + -- but it is expected to work on win32, Mac OS X and Linux. + -- TODO: use addrInfoFlagImplemented :: AddrInfoFlag -> Bool to test if the flag is available. hints = Socket.defaultHints { addrSocketType = Datagram - , addrFlags = [AI_ADDRCONFIG] -- since we use @AF_INET@ family + , addrFlags = [AI_ADDRCONFIG] } firstAddr :: Family -> [AddrInfo] -> Maybe AddrInfo @@ -134,8 +138,8 @@ ntpQuery tracer ntpSettings = do return $ NtpDrift offset where runProtocol :: IPVersion -> Maybe AddrInfo -> [AddrInfo] -> IO [NtpOffset] - runProtocol _proto _localAddr [] = return [] - runProtocol _proto Nothing _ = return [] + runProtocol _proto _localAddr [] = return [] -- No servers found for that protocol. + runProtocol _proto Nothing _ = return [] -- No local interface for that protocol. runProtocol protocol (Just addr) servers = do runNtpQueries tracer protocol ntpSettings addr servers >>= \case Left err -> do From a96077108a84f8e9b0e574ac06aa9b4c1f743d2e Mon Sep 17 00:00:00 2001 From: Marcin Szamotulski Date: Tue, 4 Feb 2020 19:15:11 +0100 Subject: [PATCH 17/27] Do not export minimumOfThree It is not needed outside of `Query` module --- ntp-client/src/Network/NTP/Query.hs | 1 - 1 file changed, 1 deletion(-) diff --git a/ntp-client/src/Network/NTP/Query.hs b/ntp-client/src/Network/NTP/Query.hs index 938eb9381c4..0bded75af08 100644 --- a/ntp-client/src/Network/NTP/Query.hs +++ b/ntp-client/src/Network/NTP/Query.hs @@ -3,7 +3,6 @@ module Network.NTP.Query ( NtpSettings(..) , NtpStatus(..) - , minimumOfThree , ntpQuery ) where From e58e25bf9890f7e082542aa9c4c468006d51fe06 Mon Sep 17 00:00:00 2001 From: Marcin Szamotulski Date: Tue, 4 Feb 2020 19:16:29 +0100 Subject: [PATCH 18/27] Remove forever loop which keeps a thread GHC will be able to free memory taken by the tread earlier. --- ntp-client/src/Network/NTP/Query.hs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/ntp-client/src/Network/NTP/Query.hs b/ntp-client/src/Network/NTP/Query.hs index 0bded75af08..a318b0e1587 100644 --- a/ntp-client/src/Network/NTP/Query.hs +++ b/ntp-client/src/Network/NTP/Query.hs @@ -9,9 +9,9 @@ module Network.NTP.Query ( import Control.Concurrent (threadDelay) import Control.Concurrent.Async import Control.Concurrent.STM -import Control.Exception (bracket) +import Control.Exception (bracket, onException) import System.IO.Error (tryIOError, userError, ioError) -import Control.Monad (forever, forM, forM_, replicateM_) +import Control.Monad (forM, forM_, replicateM_) import Control.Tracer import Data.Binary (decodeOrFail, encode) import qualified Data.ByteString.Lazy as LBS @@ -174,10 +174,12 @@ runNtpQueries tracer protocol netSettings localAddr destAddrs traceWith tracer $ NtpTraceSocketOpen protocol Socket.setSocketOption socket ReuseAddr 1 inQueue <- atomically $ newTVar [] - _err <- withAsync (send socket >> loopForever) $ \sender -> - withAsync timeout $ \delay -> - withAsync (receiver socket inQueue ) $ \revc -> - waitAnyCancel [sender, delay, revc] + _err <- + withAsync timeout $ \timeoutT -> + withAsync (receiver socket inQueue ) $ \receiverT -> do + senderT <- async (send socket) + waitAnyCancel [timeoutT, receiverT] + `onException` cancel senderT atomically $ readTVar inQueue send :: Socket -> IO () @@ -192,8 +194,6 @@ runNtpQueries tracer protocol netSettings localAddr destAddrs ioError e threadDelay 100_000 - loopForever = forever $ threadDelay maxBound - timeout = do threadDelay $ (fromIntegral $ ntpResponseTimeout netSettings) + 100_000 * length destAddrs traceWith tracer $ NtpTraceWaitingForRepliesTimeout protocol From d5f1e93b0d2983cfb57a6e2d42a2a2b3e850df3a Mon Sep 17 00:00:00 2001 From: Marcin Szamotulski Date: Tue, 4 Feb 2020 19:27:30 +0100 Subject: [PATCH 19/27] ntpQuery - add documentation --- ntp-client/src/Network/NTP/Client.hs | 2 +- ntp-client/src/Network/NTP/Query.hs | 68 +++++++++++++++++++--------- 2 files changed, 47 insertions(+), 23 deletions(-) diff --git a/ntp-client/src/Network/NTP/Client.hs b/ntp-client/src/Network/NTP/Client.hs index 85f5e24ba8c..883a991b3bb 100644 --- a/ntp-client/src/Network/NTP/Client.hs +++ b/ntp-client/src/Network/NTP/Client.hs @@ -60,7 +60,7 @@ ntpClientThread :: Tracer IO NtpTrace -> NtpSettings -> TVar NtpStatus - -> IO Void + -> IO () ntpClientThread tracer ntpSettings ntpStatus = forM_ restartDelay $ \t -> do traceWith tracer $ NtpTraceRestartDelay t awaitPendingWithTimeout ntpStatus $ t * 1_000_000 diff --git a/ntp-client/src/Network/NTP/Query.hs b/ntp-client/src/Network/NTP/Query.hs index a318b0e1587..e36f952e42f 100644 --- a/ntp-client/src/Network/NTP/Query.hs +++ b/ntp-client/src/Network/NTP/Query.hs @@ -137,26 +137,34 @@ ntpQuery tracer ntpSettings = do return $ NtpDrift offset where runProtocol :: IPVersion -> Maybe AddrInfo -> [AddrInfo] -> IO [NtpOffset] - runProtocol _proto _localAddr [] = return [] -- No servers found for that protocol. - runProtocol _proto Nothing _ = return [] -- No local interface for that protocol. - runProtocol protocol (Just addr) servers = do - runNtpQueries tracer protocol ntpSettings addr servers >>= \case - Left err -> do - traceWith tracer $ NtpTraceRunProtocolError protocol err - return [] - Right [] -> do - traceWith tracer $ NtpTraceRunProtocolNoResult protocol - return [] - Right r@(_:_) -> do - traceWith tracer $ NtpTraceRunProtocolSuccess protocol - return r - -runNtpQueries :: - Tracer IO NtpTrace - -> IPVersion + -- no addresses to sent to + runProtocol _protocol _localAddr [] = return [] + -- local address is not configured, e.g. no IPv6 or IPv6 gateway. + runProtocol _protocol Nothing _ = return [] + -- local address is configured, remote address list is non empty + runProtocol protocol (Just addr) servers = do + runNtpQueries tracer protocol ntpSettings addr servers >>= \case + Left err -> do + traceWith tracer $ NtpTraceRunProtocolError protocol err + return [] + Right [] -> do + traceWith tracer $ NtpTraceRunProtocolNoResult protocol + return [] + Right r -> do + traceWith tracer $ NtpTraceRunProtocolSuccess protocol + return r + + +-- | Run an ntp query towards each address +-- +runNtpQueries + :: Tracer IO NtpTrace + -> IPVersion -- ^ address family, it must afree with local and remote + -- addresses -> NtpSettings - -> AddrInfo - -> [AddrInfo] + -> AddrInfo -- ^ local address + -> [AddrInfo] -- ^ remote addresses, they are assumed to have the same + -- family as the local address -> IO (Either IOError [NtpOffset]) runNtpQueries tracer protocol netSettings localAddr destAddrs = tryIOError $ bracket acquire release action @@ -182,22 +190,38 @@ runNtpQueries tracer protocol netSettings localAddr destAddrs `onException` cancel senderT atomically $ readTVar inQueue + -- + -- sending thread; send a series of requests: one towards each address + -- send :: Socket -> IO () send sock = forM_ destAddrs $ \addr -> do p <- mkNtpPacket - err <- tryIOError $ Socket.ByteString.sendManyTo sock - (LBS.toChunks $ encode p) (setNtpPort $ Socket.addrAddress addr) + err <- tryIOError + $ Socket.ByteString.sendManyTo sock + (LBS.toChunks $ encode p) + (setNtpPort $ Socket.addrAddress addr) case err of Right _ -> traceWith tracer $ NtpTracePacketSent protocol Left e -> do traceWith tracer $ NtpTracePacketSentError protocol e ioError e + -- delay 100ms between sending requests, this avoids dealing with ntp + -- results at the same time from various ntp servers, and thus we + -- should get better results. threadDelay 100_000 + -- + -- timeout thread + -- timeout = do - threadDelay $ (fromIntegral $ ntpResponseTimeout netSettings) + 100_000 * length destAddrs + threadDelay + $ (fromIntegral $ ntpResponseTimeout netSettings) + + 100_000 * length destAddrs traceWith tracer $ NtpTraceWaitingForRepliesTimeout protocol + -- + -- receiving thread + -- receiver :: Socket -> TVar [NtpOffset] -> IO () receiver socket inQueue = replicateM_ (length destAddrs) $ do (bs, _) <- Socket.ByteString.recvFrom socket ntpPacketSize From 3dc5bb932da7c10e897aeae3fa641f1b5d70910f Mon Sep 17 00:00:00 2001 From: Marcin Szamotulski Date: Tue, 4 Feb 2020 19:29:37 +0100 Subject: [PATCH 20/27] Improved documentation and layout of NTP.Query module --- ntp-client/src/Network/NTP/Query.hs | 45 ++++++++++++++++++++++------- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/ntp-client/src/Network/NTP/Query.hs b/ntp-client/src/Network/NTP/Query.hs index e36f952e42f..43bf4c3ebab 100644 --- a/ntp-client/src/Network/NTP/Query.hs +++ b/ntp-client/src/Network/NTP/Query.hs @@ -56,22 +56,32 @@ data NtpStatus = -- `ntpResponseTimeout` or NTP was not configured. | NtpSyncUnavailable deriving (Eq, Show) --- | Wait for at least three replies and report the minimum of the reported offsets. + +-- | Wait for at least three replies and report the minimum of the reported +-- offsets. +-- minimumOfThree :: [NtpOffset] -> Maybe NtpOffset minimumOfThree l - = if length l >= 3 then Just $ minimum l - else Nothing + = if length l >= 3 + then Just $ minimum l + else Nothing + +-- | Get a list local udp addresses. +-- udpLocalAddresses :: IO [AddrInfo] --- Hints Host Service -udpLocalAddresses = Socket.getAddrInfo (Just hints) Nothing (Just $ show port) +udpLocalAddresses = Socket.getAddrInfo (Just hints) Nothing Nothing where hints = Socket.defaultHints { addrFlags = [AI_PASSIVE] , addrSocketType = Datagram } - port = Socket.defaultPort + +-- | Resolve hostname into 'AddrInfo'. We use 'AI_ADDRCONFIG' so we get IPv4/6 +-- address only if the local. We don't need 'AI_V4MAPPED' which would be set +-- by default. +-- resolveHost :: String -> IO [AddrInfo] resolveHost host = Socket.getAddrInfo (Just hints) (Just host) Nothing where @@ -92,10 +102,13 @@ setNtpPort addr = case addr of (SockAddrInet _ host) -> SockAddrInet ntpPort host (SockAddrInet6 _ flow host scope) -> SockAddrInet6 ntpPort flow host scope sockAddr -> sockAddr - where - ntpPort :: PortNumber - ntpPort = 123 + where + ntpPort :: PortNumber + ntpPort = 123 + +-- | Resolve dns names +-- lookupServers :: Tracer IO NtpTrace -> [String] -> IO ([AddrInfo], [AddrInfo]) lookupServers tracer names = do dests <- forM names $ \server -> do @@ -107,8 +120,18 @@ lookupServers tracer names = do l -> return l return (mapMaybe fst dests, mapMaybe snd dests) --- | Perform a single NTP query and return the result. --- This function my throw an IO exception. + +-- | Perform a series of NTP queries: one for each dns name. Resolve each dns +-- name, get local addresses: both IPv4 and IPv6 and engage in ntp protocol +-- towards one ip address per address family per dns name, but only for address +-- families for which we have a local address. This is to avoid trying to send +-- IPv4/6 requests if IPv4/6 gateway is not configured. +-- +-- It may throw an `IOException`: +-- +-- * if neither IPv4 nor IPv6 address is configured +-- * if network I/O errors +-- ntpQuery :: Tracer IO NtpTrace -> NtpSettings From 029f0bcc1d0a7f9f668f64e72aa9fc1dee7ea369 Mon Sep 17 00:00:00 2001 From: Marcin Szamotulski Date: Tue, 4 Feb 2020 19:31:45 +0100 Subject: [PATCH 21/27] ntp-client ghc-options The test suite does not need rts, -threaded and -N options set. --- ntp-client/ntp-client.cabal | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/ntp-client/ntp-client.cabal b/ntp-client/ntp-client.cabal index 782ef5d4052..a3eb2d5a0ef 100644 --- a/ntp-client/ntp-client.cabal +++ b/ntp-client/ntp-client.cabal @@ -41,10 +41,7 @@ test-suite ntp-client-test , tasty , tasty-quickcheck default-language: Haskell2010 - ghc-options: -threaded - -rtsopts - -Wall - -with-rtsopts=-N + ghc-options: -Wall default-extensions: OverloadedStrings , DeriveDataTypeable , GeneralizedNewtypeDeriving From 744f6bb522d32c6a96075817492dc5f56cfb5360 Mon Sep 17 00:00:00 2001 From: Marcin Szamotulski Date: Tue, 4 Feb 2020 19:33:13 +0100 Subject: [PATCH 22/27] ntpClientThread * use explicit recursion * queryLoop - it must write the returned value by 'ntpQuery', otherwise a thread waiting for the update will not receive it. It is up the the application to decide what to do if 'NtpSyncUnavailable' is received. * improve documentation * return type is 'IO Void', which clearly indicates that this is an infinite loop. --- ntp-client/src/Network/NTP/Client.hs | 58 +++++++++++++++++++--------- 1 file changed, 39 insertions(+), 19 deletions(-) diff --git a/ntp-client/src/Network/NTP/Client.hs b/ntp-client/src/Network/NTP/Client.hs index 883a991b3bb..61e9862dbf2 100644 --- a/ntp-client/src/Network/NTP/Client.hs +++ b/ntp-client/src/Network/NTP/Client.hs @@ -10,8 +10,8 @@ import Control.Concurrent.Async import Control.Concurrent.STM (STM, atomically, check) import Control.Concurrent.STM.TVar import System.IO.Error (catchIOError) -import Control.Monad import Control.Tracer +import Data.Void (Void) import Network.NTP.Query import Network.NTP.Trace @@ -21,7 +21,7 @@ data NtpClient = NtpClient ntpGetStatus :: STM NtpStatus -- | Bypass all internal thread Delays and trigger a new NTP query (non-blocking). , ntpQueryBlocking :: IO NtpStatus - , ntpThread :: Async () + , ntpThread :: Async Void } -- | Setup a NtpClient and run a application that uses that client. @@ -55,26 +55,46 @@ awaitPendingWithTimeout tvar t check $ s == NtpSyncPending ) +-- | ntp client thread which wakes up every 'ntpPollDelay' to make ntp queries. +-- It can be woken up earlier by setting 'NptStatus' to 'NtpSyncPending'. +-- -- TODO: Reset the delay time if ntpQuery did one successful query. ntpClientThread :: Tracer IO NtpTrace -> NtpSettings -> TVar NtpStatus - -> IO () -ntpClientThread tracer ntpSettings ntpStatus = forM_ restartDelay $ \t -> do - traceWith tracer $ NtpTraceRestartDelay t - awaitPendingWithTimeout ntpStatus $ t * 1_000_000 - traceWith tracer NtpTraceRestartingClient - catchIOError queryLoop (\err -> traceWith tracer $ NtpTraceIOError err) - atomically $ writeTVar ntpStatus NtpSyncUnavailable - where - restartDelay :: [Int] - restartDelay = [0, 5, 10, 20, 60, 180, 600] ++ repeat 600 + -> IO Void +ntpClientThread tracer ntpSettings ntpStatus = go 0 + where + -- outer loop of the ntp client. If inner loop errors we restart after the + -- 'delay' seconds + go :: Int -> IO Void + go delay | delay <= 0 = do + queryLoop + `catchIOError` (traceWith tracer . NtpTraceIOError) + atomically $ writeTVar ntpStatus NtpSyncUnavailable + go 5 + go delay = do + traceWith tracer $ NtpTraceRestartDelay delay + awaitPendingWithTimeout ntpStatus $ delay * 1_000_000 + traceWith tracer NtpTraceRestartingClient + queryLoop + `catchIOError` (traceWith tracer . NtpTraceIOError) + atomically $ writeTVar ntpStatus NtpSyncUnavailable + go (2 * delay `max` 600) - queryLoop = ntpQuery tracer ntpSettings >>= \case - status@(NtpDrift _ ) -> do - atomically $ writeTVar ntpStatus status - traceWith tracer NtpTraceClientSleeping - awaitPendingWithTimeout ntpStatus $ fromIntegral $ ntpPollDelay ntpSettings - queryLoop - _ -> return () + -- inner loop of the ntp client. Note that 'nptQuery' will return either + -- 'NptDrift' or 'NptSyncUnavailable'. + queryLoop :: IO () + queryLoop = ntpQuery tracer ntpSettings >>= \case + status@NtpDrift{} -> do + atomically $ writeTVar ntpStatus status + traceWith tracer NtpTraceClientSleeping + awaitPendingWithTimeout ntpStatus $ fromIntegral $ ntpPollDelay ntpSettings + queryLoop + status@NtpSyncUnavailable -> + -- we need to update the status even if the result is + -- 'NptSyncUnavailable', so that the thread blocked on it will be + -- waken up. + atomically $ writeTVar ntpStatus status + NtpSyncPending -> error "ntpClientThread: impossible happend" From e112387598b6615e5cadd3f57d96acef8f57f4c5 Mon Sep 17 00:00:00 2001 From: Marcin Szamotulski Date: Tue, 4 Feb 2020 19:36:19 +0100 Subject: [PATCH 23/27] npt-client - improve documentation --- ntp-client/src/Network/NTP/Client.hs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/ntp-client/src/Network/NTP/Client.hs b/ntp-client/src/Network/NTP/Client.hs index 61e9862dbf2..59ac33818d2 100644 --- a/ntp-client/src/Network/NTP/Client.hs +++ b/ntp-client/src/Network/NTP/Client.hs @@ -16,6 +16,10 @@ import Data.Void (Void) import Network.NTP.Query import Network.NTP.Trace + +-- | 'NtpClient' which recieves updates of the wall clcok drift every +-- 'ntpPollDelay'. It also allows to force engaging in ntp protocol. +-- data NtpClient = NtpClient { -- | Query the current NTP status. ntpGetStatus :: STM NtpStatus @@ -24,9 +28,11 @@ data NtpClient = NtpClient , ntpThread :: Async Void } --- | Setup a NtpClient and run a application that uses that client. --- The NtpClient is terminated when the application returns. --- The application should use waitCatch on ntpThread. + +-- | Setup a NtpClient and run an application that uses provided 'NtpClient'. +-- The 'NtpClient' is terminated when the callback returns. The application +-- can 'waitCatch' on 'ntpThread'. +-- withNtpClient :: Tracer IO NtpTrace -> NtpSettings -> (NtpClient -> IO a) -> IO a withNtpClient tracer ntpSettings action = do traceWith tracer NtpTraceStartNtpClient From 35a6b8ea30ab6407d6c60c2161a44eb3011386d9 Mon Sep 17 00:00:00 2001 From: Marcin Szamotulski Date: Tue, 4 Feb 2020 19:41:06 +0100 Subject: [PATCH 24/27] Forceful ntp update When triggering a forceful update, check if the client is not already enaged in ntp protocols. If not trigger update, otherwise wait for completion of running ntp protocol instances. --- ntp-client/src/Network/NTP/Client.hs | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/ntp-client/src/Network/NTP/Client.hs b/ntp-client/src/Network/NTP/Client.hs index 59ac33818d2..e89b7e1bab0 100644 --- a/ntp-client/src/Network/NTP/Client.hs +++ b/ntp-client/src/Network/NTP/Client.hs @@ -9,6 +9,7 @@ import Control.Concurrent (threadDelay) import Control.Concurrent.Async import Control.Concurrent.STM (STM, atomically, check) import Control.Concurrent.STM.TVar +import Control.Monad (when) import System.IO.Error (catchIOError) import Control.Tracer import Data.Void (Void) @@ -22,10 +23,12 @@ import Network.NTP.Trace -- data NtpClient = NtpClient { -- | Query the current NTP status. - ntpGetStatus :: STM NtpStatus - -- | Bypass all internal thread Delays and trigger a new NTP query (non-blocking). - , ntpQueryBlocking :: IO NtpStatus - , ntpThread :: Async Void + ntpGetStatus :: STM NtpStatus + -- | Force to update the ntp state, unless an ntp query is already + -- running. This is a blocking operation. + , ntpQueryBlocking :: IO NtpStatus + -- | Ntp client thread + , ntpThread :: Async Void } @@ -42,12 +45,17 @@ withNtpClient tracer ntpSettings action = do { ntpGetStatus = readTVar ntpStatus , ntpQueryBlocking = do traceWith tracer NtpTraceTriggerUpdate - atomically $ writeTVar ntpStatus NtpSyncPending - status <- atomically $ do - s <- readTVar ntpStatus - check $ s /= NtpSyncPending - return s - return status + -- trigger an update, unless an ntp query is not already + -- running + atomically $ do + status <- readTVar ntpStatus + when (status /= NtpSyncPending) + $ writeTVar ntpStatus NtpSyncPending + -- block until the state changes + atomically $ do + status <- readTVar ntpStatus + check $ status /= NtpSyncPending + return status , ntpThread = tid } action client From 9da24aca28cd158ac55647b0e5f2daed6dffce14 Mon Sep 17 00:00:00 2001 From: Marcin Szamotulski Date: Tue, 4 Feb 2020 19:46:51 +0100 Subject: [PATCH 25/27] Rename the test suite as test-ntp-client This matches other test suites in 'ouroboros-network' package. --- ntp-client/ntp-client.cabal | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ntp-client/ntp-client.cabal b/ntp-client/ntp-client.cabal index a3eb2d5a0ef..4fc827947f6 100644 --- a/ntp-client/ntp-client.cabal +++ b/ntp-client/ntp-client.cabal @@ -29,7 +29,7 @@ Library ghc-options: -Wall default-extensions: GeneralizedNewtypeDeriving -test-suite ntp-client-test +test-suite test-ntp-client hs-source-dirs: test, src main-is: Test.hs type: exitcode-stdio-1.0 From 520049cc1a3a259ac55d70b1112b3a60ba2617bd Mon Sep 17 00:00:00 2001 From: MarcFontaine Date: Tue, 4 Feb 2020 22:14:24 +0100 Subject: [PATCH 26/27] run nix/regenerate.sh --- nix/.stack.nix/ntp-client.nix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nix/.stack.nix/ntp-client.nix b/nix/.stack.nix/ntp-client.nix index ab0a119c5c4..d6ceb2cbdf7 100644 --- a/nix/.stack.nix/ntp-client.nix +++ b/nix/.stack.nix/ntp-client.nix @@ -28,7 +28,7 @@ ]; }; tests = { - "ntp-client-test" = { + "test-ntp-client" = { depends = [ (hsPkgs.base) (hsPkgs.binary) From 981abc51aa4650462245901f0af675edb774aa90 Mon Sep 17 00:00:00 2001 From: Marcin Szamotulski Date: Wed, 5 Feb 2020 09:06:47 +0100 Subject: [PATCH 27/27] runNtpQueries - more robust thread managment * mask async exception to guarantee that all threads are cancelled * no need to use `waitAnyCancel` inside `withAsync`` callback, `waitAny` is enough for threads started with `withAsync`. --- ntp-client/src/Network/NTP/Query.hs | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/ntp-client/src/Network/NTP/Query.hs b/ntp-client/src/Network/NTP/Query.hs index 43bf4c3ebab..56e552de6a1 100644 --- a/ntp-client/src/Network/NTP/Query.hs +++ b/ntp-client/src/Network/NTP/Query.hs @@ -9,12 +9,13 @@ module Network.NTP.Query ( import Control.Concurrent (threadDelay) import Control.Concurrent.Async import Control.Concurrent.STM -import Control.Exception (bracket, onException) +import Control.Exception (bracket, mask, throwIO) import System.IO.Error (tryIOError, userError, ioError) import Control.Monad (forM, forM_, replicateM_) import Control.Tracer import Data.Binary (decodeOrFail, encode) import qualified Data.ByteString.Lazy as LBS +import Data.Functor (void) import Data.List (find) import Data.Maybe import Network.Socket ( AddrInfo @@ -205,12 +206,17 @@ runNtpQueries tracer protocol netSettings localAddr destAddrs traceWith tracer $ NtpTraceSocketOpen protocol Socket.setSocketOption socket ReuseAddr 1 inQueue <- atomically $ newTVar [] - _err <- - withAsync timeout $ \timeoutT -> - withAsync (receiver socket inQueue ) $ \receiverT -> do - senderT <- async (send socket) - waitAnyCancel [timeoutT, receiverT] - `onException` cancel senderT + withAsync timeout $ \timeoutT -> + withAsync (receiver socket inQueue ) $ \receiverT -> + -- mask async exceptions to guarantee that the other threads are + -- cancelled correctly. 'timeoutT' and 'receiverT' threads were + -- started using 'withAsync', so they will be terminated when the + -- callbak either returns or errors. + mask $ \unmask -> + async (unmask $ send socket) >>= \senderT -> unmask $ + waitCatch senderT >>= \case + Left e -> throwIO e + Right _ -> void $ waitAny [timeoutT, receiverT] atomically $ readTVar inQueue --