From 88a6f1b166830e2ecb9bbd4cea87a0fc8d731b22 Mon Sep 17 00:00:00 2001 From: Diogo Biazus Date: Sat, 14 Dec 2024 12:54:39 -0500 Subject: [PATCH 1/2] Very akward wiring to get a PoC --- .gitmodules | 3 + cabal.project.local | 5 ++ postgres-websockets.cabal | 4 +- postgresql-replicant | 1 + src/PostgresWebsockets/Context.hs | 5 +- src/PostgresWebsockets/Middleware.hs | 7 +- src/PostgresWebsockets/ReplicantBroadcast.hs | 85 ++++++++++++++++++++ src/PostgresWebsockets/Server.hs | 1 + 8 files changed, 106 insertions(+), 5 deletions(-) create mode 100644 .gitmodules create mode 100644 cabal.project.local create mode 160000 postgresql-replicant create mode 100644 src/PostgresWebsockets/ReplicantBroadcast.hs diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..1f55a21 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "postgresql-replicant"] + path = postgresql-replicant + url = git@github.com:diogob/postgresql-replicant.git diff --git a/cabal.project.local b/cabal.project.local new file mode 100644 index 0000000..09373b7 --- /dev/null +++ b/cabal.project.local @@ -0,0 +1,5 @@ +ignore-project: False +source-repository-package + type: git + location: /home/diogo/Projects/postgres-websockets/postgresql-replicant + tag: cabal-file diff --git a/postgres-websockets.cabal b/postgres-websockets.cabal index 21d143f..d52dc22 100644 --- a/postgres-websockets.cabal +++ b/postgres-websockets.cabal @@ -18,7 +18,7 @@ common warnings common language default-language: Haskell2010 - default-extensions: OverloadedStrings, LambdaCase, RecordWildCards, QuasiQuotes + default-extensions: OverloadedStrings, LambdaCase, RecordWildCards, QuasiQuotes, TypeApplications library import: warnings @@ -27,6 +27,7 @@ library exposed-modules: PostgresWebsockets , PostgresWebsockets.Broadcast , PostgresWebsockets.HasqlBroadcast + , PostgresWebsockets.ReplicantBroadcast , PostgresWebsockets.Claims , PostgresWebsockets.Config , APrelude @@ -53,6 +54,7 @@ library , mtl >=2.3.1 && <2.4 , async >=2.2.5 && <2.3 , postgresql-libpq >= 0.10.0 && < 0.12 + , postgresql-replicant ^>= 0.2 , retry >= 0.8.1.0 && < 0.10 , stm >= 2.5.0.0 && < 2.6 , stm-containers >= 1.1.0.2 && < 1.3 diff --git a/postgresql-replicant b/postgresql-replicant new file mode 160000 index 0000000..47012b8 --- /dev/null +++ b/postgresql-replicant @@ -0,0 +1 @@ +Subproject commit 47012b83b27c88f857f3c77140d38d911c4b759e diff --git a/src/PostgresWebsockets/Context.hs b/src/PostgresWebsockets/Context.hs index 063e362..9bbe1ab 100644 --- a/src/PostgresWebsockets/Context.hs +++ b/src/PostgresWebsockets/Context.hs @@ -21,11 +21,13 @@ import qualified Hasql.Connection.Setting.Connection as C import PostgresWebsockets.Broadcast (Multiplexer) import PostgresWebsockets.Config (AppConfig (..)) import PostgresWebsockets.HasqlBroadcast (newHasqlBroadcaster) +import PostgresWebsockets.ReplicantBroadcast (newReplicantBroadcaster) data Context = Context { ctxConfig :: AppConfig, ctxPool :: P.Pool, - ctxMulti :: Multiplexer, + ctxNotifications :: Multiplexer, + ctxChanges :: Multiplexer, ctxGetTime :: IO UTCTime } @@ -35,6 +37,7 @@ mkContext conf@AppConfig {..} shutdownServer = do Context conf <$> P.acquire config <*> newHasqlBroadcaster shutdown configListenChannel configRetries configReconnectInterval pgSettings + <*> newReplicantBroadcaster shutdown configRetries configReconnectInterval "" <*> mkGetTime where config = P.settings [P.staticConnectionSettings [C.connection $ C.string $ decodeUtf8 pgSettings]] diff --git a/src/PostgresWebsockets/Middleware.hs b/src/PostgresWebsockets/Middleware.hs index bd00ec0..3fb9c06 100644 --- a/src/PostgresWebsockets/Middleware.hs +++ b/src/PostgresWebsockets/Middleware.hs @@ -103,10 +103,11 @@ wsApp Context {..} pendingConn = Nothing -> pure () Just ch -> sendMessageWithTimestamp $ connectionOpenMessage (T.intercalate "," chs) ch - when (hasRead mode) $ - forM_ chs $ - flip (onMessage ctxMulti) $ + when (hasRead mode) $ do + forM_ (filter (/= "database.changes") chs) $ + flip (onMessage ctxNotifications) $ WS.sendTextData conn . B.payload + forM_ (filter (== "database.changes") chs) $ flip (onMessage ctxChanges) $ WS.sendTextData conn . B.payload when (hasWrite mode) $ notifySession conn sendNotification chs diff --git a/src/PostgresWebsockets/ReplicantBroadcast.hs b/src/PostgresWebsockets/ReplicantBroadcast.hs new file mode 100644 index 0000000..4c52394 --- /dev/null +++ b/src/PostgresWebsockets/ReplicantBroadcast.hs @@ -0,0 +1,85 @@ +-- | +-- Module : PostgresWebsockets.ReplicantBroadcast +-- Description : Build a Replicant based producer 'Multiplexer'. +-- +-- Uses Broadcast module adding database as a source producer. +-- This module provides a function to produce a 'Multiplexer' from a Replicant 'Connection'. +module PostgresWebsockets.ReplicantBroadcast + ( newReplicantBroadcaster, + -- re-export + relayMessages, + relayMessagesForever, + ) +where + +import APrelude +import Data.Aeson (encode) +import qualified Database.PostgreSQL.Replicant as PGR +import PostgresWebsockets.Broadcast + +-- | Returns a multiplexer from a connection URI, keeps trying to connect in case there is any error. +-- This function also spawns a thread that keeps relaying the messages from the database to the multiplexer's listeners +newReplicantBroadcaster :: IO () -> Int -> Maybe Int -> ByteString -> IO Multiplexer +newReplicantBroadcaster onConnectionFailure _maxRetries = newReplicantBroadcasterForChanges onConnectionFailure + +-- + +-- | Returns a multiplexer from a channel and an IO Connection, listen for different database notifications on the provided channel using the connection produced. +-- +-- This function also spawns a thread that keeps relaying the messages from the database to the multiplexer's listeners +-- +-- To listen on channels *chat* +-- +-- @ +-- import Protolude +-- import PostgresWebsockets.ReplicantBroadcast +-- import PostgresWebsockets.Broadcast +-- import Replicant.Connection +-- +-- main = do +-- conOrError <- H.acquire "postgres://localhost/test_database" +-- let con = either (panic . show) id conOrError :: Connection +-- multi <- newReplicantBroadcaster con +-- +-- onMessage multi "chat" (\ch -> +-- forever $ fmap print (atomically $ readTChan ch) +-- @ +newReplicantBroadcasterForChanges :: IO () -> Maybe Int -> ByteString -> IO Multiplexer +newReplicantBroadcasterForChanges onConnectionFailure checkInterval _conURI = do + multi <- newMultiplexer openProducer $ const onConnectionFailure + case checkInterval of + Just i -> superviseMultiplexer multi i shouldRestart + _ -> pure () + void $ relayMessagesForever multi + return multi + where + toMsg :: Text -> Message + toMsg = Message "database.changes" + + shouldRestart = + pure False -- TODO implement this properly + openProducer msgQ = do + let settings = + PGR.PgSettings + "replicant" + Nothing + "postgres_ws_test" + "localhost" + "5432" + "second_test" + "100" + void $ do + print ("forking replicant producer" :: Text) + forkIO $ + PGR.withLogicalStream settings $ \changePayload -> do + print ("Change received!" :: Text) + print $ encode changePayload + writeMessage $ decodeUtf8 $ showBS $ encode changePayload + case changePayload of + PGR.InformationMessage _infoMsg -> + pure Nothing + PGR.ChangeMessage change -> + pure . Just $ PGR.changeNextLSN change + where + writeMessage m = atomically $ writeTQueue msgQ $ toMsg m + diff --git a/src/PostgresWebsockets/Server.hs b/src/PostgresWebsockets/Server.hs index 1cf33ba..0faf479 100644 --- a/src/PostgresWebsockets/Server.hs +++ b/src/PostgresWebsockets/Server.hs @@ -26,6 +26,7 @@ serve conf@AppConfig {..} = do let shutdown = putErrLn ("Broadcaster connection is dead" :: Text) >> putMVar shutdownSignal () ctx <- mkContext conf shutdown + putStrLn "Listening to changes..." let waitForShutdown cl = void $ forkIO (takeMVar shutdownSignal >> cl) appSettings = warpSettings waitForShutdown conf app = postgresWsMiddleware ctx $ logStdout $ maybe dummyApp staticApp' configPath From ac5ebfafebf8570efdc515ddb2f7dfd647b40e6e Mon Sep 17 00:00:00 2001 From: Diogo Biazus Date: Sun, 15 Dec 2024 16:04:12 -0500 Subject: [PATCH 2/2] Experimenting with sending arbitrary queries upon change --- sample-env | 2 +- src/PostgresWebsockets/Context.hs | 5 +++-- src/PostgresWebsockets/ReplicantBroadcast.hs | 21 +++++++++++++++----- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/sample-env b/sample-env index e3abbcf..1407b00 100644 --- a/sample-env +++ b/sample-env @@ -1,5 +1,5 @@ ## PostgreSQL URI where the server will connect to issue NOTIFY and LISTEN commands -export PGWS_DB_URI="postgres://localhost:5432/postgres" +export PGWS_DB_URI="postgres://localhost:5432/postgres_ws_test" ## Size of connection pool used to issue notify commands (LISTEN commands are always issued on the same connection that is not part of the pool). export PGWS_POOL_SIZE=10 diff --git a/src/PostgresWebsockets/Context.hs b/src/PostgresWebsockets/Context.hs index 9bbe1ab..d017b34 100644 --- a/src/PostgresWebsockets/Context.hs +++ b/src/PostgresWebsockets/Context.hs @@ -34,10 +34,11 @@ data Context = Context -- | Given a configuration and a shutdown action (performed when the Multiplexer's listen connection dies) produces the context necessary to run sessions mkContext :: AppConfig -> IO () -> IO Context mkContext conf@AppConfig {..} shutdownServer = do - Context conf + pool <- P.acquire config + pure (Context conf pool) <$> P.acquire config <*> newHasqlBroadcaster shutdown configListenChannel configRetries configReconnectInterval pgSettings - <*> newReplicantBroadcaster shutdown configRetries configReconnectInterval "" + <*> newReplicantBroadcaster shutdown configRetries configReconnectInterval pool "" <*> mkGetTime where config = P.settings [P.staticConnectionSettings [C.connection $ C.string $ decodeUtf8 pgSettings]] diff --git a/src/PostgresWebsockets/ReplicantBroadcast.hs b/src/PostgresWebsockets/ReplicantBroadcast.hs index 4c52394..3ec7045 100644 --- a/src/PostgresWebsockets/ReplicantBroadcast.hs +++ b/src/PostgresWebsockets/ReplicantBroadcast.hs @@ -15,11 +15,16 @@ where import APrelude import Data.Aeson (encode) import qualified Database.PostgreSQL.Replicant as PGR +import qualified Hasql.Decoders as Decode +import qualified Hasql.Encoders as SQL +import qualified Hasql.Pool as SQL +import qualified Hasql.Session as SQL +import qualified Hasql.Statement as SQL import PostgresWebsockets.Broadcast -- | Returns a multiplexer from a connection URI, keeps trying to connect in case there is any error. -- This function also spawns a thread that keeps relaying the messages from the database to the multiplexer's listeners -newReplicantBroadcaster :: IO () -> Int -> Maybe Int -> ByteString -> IO Multiplexer +newReplicantBroadcaster :: IO () -> Int -> Maybe Int -> SQL.Pool -> ByteString -> IO Multiplexer newReplicantBroadcaster onConnectionFailure _maxRetries = newReplicantBroadcasterForChanges onConnectionFailure -- @@ -44,8 +49,8 @@ newReplicantBroadcaster onConnectionFailure _maxRetries = newReplicantBroadcaste -- onMessage multi "chat" (\ch -> -- forever $ fmap print (atomically $ readTChan ch) -- @ -newReplicantBroadcasterForChanges :: IO () -> Maybe Int -> ByteString -> IO Multiplexer -newReplicantBroadcasterForChanges onConnectionFailure checkInterval _conURI = do +newReplicantBroadcasterForChanges :: IO () -> Maybe Int -> SQL.Pool -> ByteString -> IO Multiplexer +newReplicantBroadcasterForChanges onConnectionFailure checkInterval pool _conURI = do multi <- newMultiplexer openProducer $ const onConnectionFailure case checkInterval of Just i -> superviseMultiplexer multi i shouldRestart @@ -74,12 +79,18 @@ newReplicantBroadcasterForChanges onConnectionFailure checkInterval _conURI = do PGR.withLogicalStream settings $ \changePayload -> do print ("Change received!" :: Text) print $ encode changePayload - writeMessage $ decodeUtf8 $ showBS $ encode changePayload + result <- SQL.use pool session + case result of + Right value -> writeMessage $ decodeUtf8 $ showBS $ encode value + Left _ -> print ("ERROR executing" :: Text) case changePayload of PGR.InformationMessage _infoMsg -> pure Nothing PGR.ChangeMessage change -> pure . Just $ PGR.changeNextLSN change where - writeMessage m = atomically $ writeTQueue msgQ $ toMsg m + session = SQL.statement () stmt + stmt = SQL.Statement "SELECT json_agg(row_to_json(t.*)) FROM test t" SQL.noParams singleJsonDecoder True + singleJsonDecoder = Decode.singleRow $ Decode.column $ Decode.nonNullable Decode.json + writeMessage m = atomically $ writeTQueue msgQ $ toMsg m