Skip to content

Commit

Permalink
clean up and convert env -> syncEnv in places
Browse files Browse the repository at this point in the history
  • Loading branch information
Cmdv committed Mar 17, 2023
1 parent f359eca commit 1f3bd76
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 40 deletions.
1 change: 0 additions & 1 deletion cardano-db-sync/src/Cardano/DbSync/Database.hs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ rollbackLedger syncEnv point =
pure Nothing
Left lsfs ->
Just . fmap fst <$> verifySnapshotPoint syncEnv (OnDisk <$> lsfs)
-- TODO: Vince
NoLedger _ -> pure Nothing

-- | This not only checks that the ledger and ChainSync points are equal, but also that the
Expand Down
8 changes: 4 additions & 4 deletions cardano-db-sync/src/Cardano/DbSync/Default.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ insertListBlocks ::
SyncEnv ->
[CardanoBlock] ->
IO (Either SyncNodeError ())
insertListBlocks env blocks = do
backend <- getBackend env
insertListBlocks synEnv blocks = do
backend <- getBackend synEnv
DB.runDbIohkLogging backend tracer
. runExceptT
$ do
traverse_ (applyAndInsertBlockMaybe env) blocks
traverse_ (applyAndInsertBlockMaybe synEnv) blocks
where
tracer = getTrace env
tracer = getTrace synEnv

applyAndInsertBlockMaybe ::
SyncEnv -> CardanoBlock -> ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) ()
Expand Down
6 changes: 3 additions & 3 deletions cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Offline.hs
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,17 @@ insertOfflineResults trce resultQueue = do
ResultError {} -> True

runOfflineFetchThread :: Trace IO Text -> SyncEnv -> IO ()
runOfflineFetchThread trce env = do
runOfflineFetchThread trce syncEnv = do
logInfo trce "Running Offline fetch thread"
forever $ do
threadDelay 60_000_000 -- 60 second sleep
xs <- blockingFlushTBQueue (envOfflineWorkQueue env)
xs <- blockingFlushTBQueue (envOfflineWorkQueue syncEnv)
manager <- Http.newManager tlsManagerSettings
now <- liftIO Time.getPOSIXTime
mapM_ (queueInsert <=< fetchOfflineData trce manager now) xs
where
queueInsert :: FetchResult -> IO ()
queueInsert = atomically . writeTBQueue (envOfflineResultQueue env)
queueInsert = atomically . writeTBQueue (envOfflineResultQueue syncEnv)

-- -------------------------------------------------------------------------------------------------

Expand Down
22 changes: 11 additions & 11 deletions cardano-db-sync/src/Cardano/DbSync/LocalStateQuery.hs
Original file line number Diff line number Diff line change
Expand Up @@ -89,21 +89,21 @@ getSlotDetailsNode ::
NoLedgerEnv ->
SlotNo ->
IO SlotDetails
getSlotDetailsNode env slot = do
einterp1 <- maybe (getHistoryInterpreter env) pure =<< atomically (fromStrictMaybe <$> readTVar interVar)
getSlotDetailsNode nlEnv slot = do
einterp1 <- maybe (getHistoryInterpreter nlEnv) pure =<< atomically (fromStrictMaybe <$> readTVar interVar)
case evalSlotDetails einterp1 of
Right sd -> insertCurrentTime sd
Left _ -> do
einterp2 <- getHistoryInterpreter env
einterp2 <- getHistoryInterpreter nlEnv
case evalSlotDetails einterp2 of
Left err -> panic $ "getSlotDetailsNode: " <> textShow err
Right sd -> insertCurrentTime sd
where
interVar = nleHistoryInterpreterVar env
interVar = nleHistoryInterpreterVar nlEnv

evalSlotDetails :: Interpreter (CardanoEras StandardCrypto) -> Either PastHorizonException SlotDetails
evalSlotDetails interp =
interpretQuery interp (querySlotDetails (nleSystemStart env) slot)
interpretQuery interp (querySlotDetails (nleSystemStart nlEnv) slot)

insertCurrentTime :: SlotDetails -> IO SlotDetails
insertCurrentTime sd = do
Expand All @@ -117,7 +117,7 @@ getSlotDetailsNode env slot = do
getHistoryInterpreter ::
NoLedgerEnv ->
IO CardanoInterpreter
getHistoryInterpreter env = do
getHistoryInterpreter nlEnv = do
respVar <- newEmptyTMVarIO
atomically $ putTMVar reqVar (BlockQuery $ QueryHardFork GetInterpreter, respVar)
res <- atomically $ takeTMVar respVar
Expand All @@ -129,17 +129,17 @@ getHistoryInterpreter env = do
atomically $ writeTVar interVar $ Strict.Just interp
pure interp
where
reqVar = unStateQueryTMVar $ nleQueryVar env
interVar = nleHistoryInterpreterVar env
tracer = nleTracer env
reqVar = unStateQueryTMVar $ nleQueryVar nlEnv
interVar = nleHistoryInterpreterVar nlEnv
tracer = nleTracer nlEnv

-- This is called during the ChainSync setup and loops forever. Queries can be posted to
-- it and responses retrieved via a TVar.
localStateQueryHandler ::
forall a.
NoLedgerEnv ->
LocalStateQueryClient CardanoBlock (Point CardanoBlock) (Query CardanoBlock) IO a
localStateQueryHandler env =
localStateQueryHandler nlEnv =
LocalStateQueryClient idleState
where
idleState :: IO (StateQuery.ClientStIdle CardanoBlock (Point CardanoBlock) (Query CardanoBlock) IO a)
Expand All @@ -160,4 +160,4 @@ localStateQueryHandler env =
idleState
}

reqVar = unStateQueryTMVar $ nleQueryVar env
reqVar = unStateQueryTMVar $ nleQueryVar nlEnv
42 changes: 21 additions & 21 deletions cardano-db-sync/src/Cardano/DbSync/Sync.hs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ runSyncNode metricsSetters trce iomgr aop snEveryFollowing snEveryLagging dbConn
Db.noLedgerMigrations backend trce
lift $ orDie renderSyncNodeError $ insertValidateGenesisDist trce backend (dncNetworkName syncNodeConfig) genCfg (useShelleyInit syncNodeConfig)
liftIO $ epochStartup (enpExtended syncNodeParams) trce backend
-- TODO: Vince: a lot of the downstream functions require HasLedgerEnv but is this right?

case genCfg of
GenesisCardano {} -> do
liftIO $ runSyncNodeClient metricsSetters syncEnv iomgr trce (enpSocketPath syncNodeParams)
Expand All @@ -204,19 +204,19 @@ runSyncNodeClient ::
Trace IO Text ->
SocketPath ->
IO ()
runSyncNodeClient metricsSetters env iomgr trce (SocketPath socketPath) = do
runSyncNodeClient metricsSetters syncEnv iomgr trce (SocketPath socketPath) = do
logInfo trce $ "localInitiatorNetworkApplication: connecting to node via " <> textShow socketPath
void $
subscribe
(localSnocket iomgr)
codecConfig
(envNetworkMagic env)
(envNetworkMagic syncEnv)
networkSubscriptionTracers
clientSubscriptionParams
(dbSyncProtocols trce env metricsSetters)
(dbSyncProtocols trce syncEnv metricsSetters)
where
codecConfig :: CodecConfig CardanoBlock
codecConfig = configCodec $ getTopLevelConfig env
codecConfig = configCodec $ getTopLevelConfig syncEnv

clientSubscriptionParams =
ClientSubscriptionParams
Expand Down Expand Up @@ -259,7 +259,7 @@ dbSyncProtocols ::
ClientCodecs CardanoBlock IO ->
ConnectionId LocalAddress ->
NodeToClientProtocols 'InitiatorMode BSL.ByteString IO () Void
dbSyncProtocols trce env metricsSetters _version codecs _connectionId =
dbSyncProtocols trce syncEnv metricsSetters _version codecs _connectionId =
NodeToClientProtocols
{ localChainSyncProtocol = localChainSyncPtcl
, localTxSubmissionProtocol = dummylocalTxSubmit
Expand All @@ -272,20 +272,20 @@ dbSyncProtocols trce env metricsSetters _version codecs _connectionId =
localChainSyncTracer = toLogObject $ appendName "ChainSync" trce

tracer :: Trace IO Text
tracer = getTrace env
tracer = getTrace syncEnv

localChainSyncPtcl :: RunMiniProtocol 'InitiatorMode BSL.ByteString IO () Void
localChainSyncPtcl = InitiatorProtocolOnly $
MuxPeerRaw $ \channel ->
liftIO . logException trce "ChainSyncWithBlocksPtcl: " $ do
Db.runIohkLogging trce $
withPostgresqlConn (envConnString env) $ \backend -> liftIO $ do
replaceConnection env backend
setConsistentLevel env Unchecked
withPostgresqlConn (envConnString syncEnv) $ \backend -> liftIO $ do
replaceConnection syncEnv backend
setConsistentLevel syncEnv Unchecked

isFixed <- getIsSyncFixed env
let skipFix = soptSkipFix $ envOptions env
let onlyFix = soptOnlyFix $ envOptions env
isFixed <- getIsSyncFixed syncEnv
let skipFix = soptSkipFix $ envOptions syncEnv
let onlyFix = soptOnlyFix $ envOptions syncEnv
if onlyFix || (not isFixed && not skipFix)
then do
fd <- runDbIohkLogging backend tracer $ getWrongPlutusData tracer
Expand All @@ -298,15 +298,15 @@ dbSyncProtocols trce env metricsSetters _version codecs _connectionId =
( Client.chainSyncClientPeer $
chainSyncClientFix backend tracer fd
)
setIsFixedAndMigrate env
setIsFixedAndMigrate syncEnv
when onlyFix $ panic "All Good! This error is only thrown to exit db-sync." -- TODO fix.
else do
when skipFix $ setIsFixedAndMigrate env
when skipFix $ setIsFixedAndMigrate syncEnv
-- The Db thread is not forked at this point, so we can use
-- the connection here. A connection cannot be used concurrently by many
-- threads
logInfo trce "Starting chainSyncClient"
latestPoints <- getLatestPoints env
latestPoints <- getLatestPoints syncEnv
let (inMemory, onDisk) = List.span snd latestPoints
logInfo trce $
mconcat
Expand All @@ -315,15 +315,15 @@ dbSyncProtocols trce env metricsSetters _version codecs _connectionId =
, " and from disk: "
, textShow (fst <$> onDisk)
]
currentTip <- getCurrentTipBlockNo env
logDbState env
currentTip <- getCurrentTipBlockNo syncEnv
logDbState syncEnv
-- communication channel between datalayer thread and chainsync-client thread
actionQueue <- newDbActionQueue

race_
( race
(runDbThread env metricsSetters actionQueue)
(runOfflineFetchThread trce env)
(runDbThread syncEnv metricsSetters actionQueue)
(runOfflineFetchThread trce syncEnv)
)
( runPipelinedPeer
localChainSyncTracer
Expand Down Expand Up @@ -351,7 +351,7 @@ dbSyncProtocols trce env metricsSetters _version codecs _connectionId =

localStateQuery :: RunMiniProtocol 'InitiatorMode BSL.ByteString IO () Void
localStateQuery =
case envLedgerEnv env of
case envLedgerEnv syncEnv of
HasLedger _ ->
InitiatorProtocolOnly $
MuxPeer
Expand Down

0 comments on commit 1f3bd76

Please sign in to comment.