Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "postgresql-replicant"]
path = postgresql-replicant
url = git@github.com:diogob/postgresql-replicant.git
5 changes: 5 additions & 0 deletions cabal.project.local
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ignore-project: False
source-repository-package
type: git
location: /home/diogo/Projects/postgres-websockets/postgresql-replicant
tag: cabal-file
4 changes: 3 additions & 1 deletion postgres-websockets.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ common warnings

common language
default-language: Haskell2010
default-extensions: OverloadedStrings, LambdaCase, RecordWildCards, QuasiQuotes
default-extensions: OverloadedStrings, LambdaCase, RecordWildCards, QuasiQuotes, TypeApplications

library
import: warnings
Expand All @@ -27,6 +27,7 @@ library
exposed-modules: PostgresWebsockets
, PostgresWebsockets.Broadcast
, PostgresWebsockets.HasqlBroadcast
, PostgresWebsockets.ReplicantBroadcast
, PostgresWebsockets.Claims
, PostgresWebsockets.Config
, APrelude
Expand All @@ -53,6 +54,7 @@ library
, mtl >=2.3.1 && <2.4
, async >=2.2.5 && <2.3
, postgresql-libpq >= 0.10.0 && < 0.12
, postgresql-replicant ^>= 0.2
, retry >= 0.8.1.0 && < 0.10
, stm >= 2.5.0.0 && < 2.6
, stm-containers >= 1.1.0.2 && < 1.3
Expand Down
1 change: 1 addition & 0 deletions postgresql-replicant
Submodule postgresql-replicant added at 47012b
2 changes: 1 addition & 1 deletion sample-env
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
## PostgreSQL URI where the server will connect to issue NOTIFY and LISTEN commands
export PGWS_DB_URI="postgres://localhost:5432/postgres"
export PGWS_DB_URI="postgres://localhost:5432/postgres_ws_test"

## Size of connection pool used to issue notify commands (LISTEN commands are always issued on the same connection that is not part of the pool).
export PGWS_POOL_SIZE=10
Expand Down
8 changes: 6 additions & 2 deletions src/PostgresWebsockets/Context.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,24 @@ import qualified Hasql.Connection.Setting.Connection as C
import PostgresWebsockets.Broadcast (Multiplexer)
import PostgresWebsockets.Config (AppConfig (..))
import PostgresWebsockets.HasqlBroadcast (newHasqlBroadcaster)
import PostgresWebsockets.ReplicantBroadcast (newReplicantBroadcaster)

data Context = Context
{ ctxConfig :: AppConfig,
ctxPool :: P.Pool,
ctxMulti :: Multiplexer,
ctxNotifications :: Multiplexer,
ctxChanges :: Multiplexer,
ctxGetTime :: IO UTCTime
}

-- | Given a configuration and a shutdown action (performed when the Multiplexer's listen connection dies) produces the context necessary to run sessions
mkContext :: AppConfig -> IO () -> IO Context
mkContext conf@AppConfig {..} shutdownServer = do
Context conf
pool <- P.acquire config
pure (Context conf pool)
<$> P.acquire config
<*> newHasqlBroadcaster shutdown configListenChannel configRetries configReconnectInterval pgSettings
<*> newReplicantBroadcaster shutdown configRetries configReconnectInterval pool ""
<*> mkGetTime
where
config = P.settings [P.staticConnectionSettings [C.connection $ C.string $ decodeUtf8 pgSettings]]
Expand Down
7 changes: 4 additions & 3 deletions src/PostgresWebsockets/Middleware.hs
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,11 @@ wsApp Context {..} pendingConn =
Nothing -> pure ()
Just ch -> sendMessageWithTimestamp $ connectionOpenMessage (T.intercalate "," chs) ch

when (hasRead mode) $
forM_ chs $
flip (onMessage ctxMulti) $
when (hasRead mode) $ do
forM_ (filter (/= "database.changes") chs) $
flip (onMessage ctxNotifications) $
WS.sendTextData conn . B.payload
forM_ (filter (== "database.changes") chs) $ flip (onMessage ctxChanges) $ WS.sendTextData conn . B.payload

when (hasWrite mode) $
notifySession conn sendNotification chs
Expand Down
96 changes: 96 additions & 0 deletions src/PostgresWebsockets/ReplicantBroadcast.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
-- |
-- Module : PostgresWebsockets.ReplicantBroadcast
-- Description : Build a Replicant based producer 'Multiplexer'.
--
-- Uses Broadcast module adding database as a source producer.
-- This module provides a function to produce a 'Multiplexer' from a Replicant 'Connection'.
module PostgresWebsockets.ReplicantBroadcast
( newReplicantBroadcaster,
-- re-export
relayMessages,
relayMessagesForever,
)
where

import APrelude
import Data.Aeson (encode)
import qualified Database.PostgreSQL.Replicant as PGR
import qualified Hasql.Decoders as Decode
import qualified Hasql.Encoders as SQL
import qualified Hasql.Pool as SQL
import qualified Hasql.Session as SQL
import qualified Hasql.Statement as SQL
import PostgresWebsockets.Broadcast

-- | Returns a multiplexer from a connection URI, keeps trying to connect in case there is any error.
-- This function also spawns a thread that keeps relaying the messages from the database to the multiplexer's listeners
newReplicantBroadcaster :: IO () -> Int -> Maybe Int -> SQL.Pool -> ByteString -> IO Multiplexer
newReplicantBroadcaster onConnectionFailure _maxRetries = newReplicantBroadcasterForChanges onConnectionFailure

--

-- | Returns a multiplexer from a channel and an IO Connection, listen for different database notifications on the provided channel using the connection produced.
--
-- This function also spawns a thread that keeps relaying the messages from the database to the multiplexer's listeners
--
-- To listen on channels *chat*
--
-- @
-- import Protolude
-- import PostgresWebsockets.ReplicantBroadcast
-- import PostgresWebsockets.Broadcast
-- import Replicant.Connection
--
-- main = do
-- conOrError <- H.acquire "postgres://localhost/test_database"
-- let con = either (panic . show) id conOrError :: Connection
-- multi <- newReplicantBroadcaster con
--
-- onMessage multi "chat" (\ch ->
-- forever $ fmap print (atomically $ readTChan ch)
-- @
newReplicantBroadcasterForChanges :: IO () -> Maybe Int -> SQL.Pool -> ByteString -> IO Multiplexer
newReplicantBroadcasterForChanges onConnectionFailure checkInterval pool _conURI = do
multi <- newMultiplexer openProducer $ const onConnectionFailure
case checkInterval of
Just i -> superviseMultiplexer multi i shouldRestart
_ -> pure ()
void $ relayMessagesForever multi
return multi
where
toMsg :: Text -> Message
toMsg = Message "database.changes"

shouldRestart =
pure False -- TODO implement this properly
openProducer msgQ = do
let settings =
PGR.PgSettings
"replicant"
Nothing
"postgres_ws_test"
"localhost"
"5432"
"second_test"
"100"
void $ do
print ("forking replicant producer" :: Text)
forkIO $
PGR.withLogicalStream settings $ \changePayload -> do
print ("Change received!" :: Text)
print $ encode changePayload
result <- SQL.use pool session
case result of
Right value -> writeMessage $ decodeUtf8 $ showBS $ encode value
Left _ -> print ("ERROR executing" :: Text)
case changePayload of
PGR.InformationMessage _infoMsg ->
pure Nothing
PGR.ChangeMessage change ->
pure . Just $ PGR.changeNextLSN change
where
session = SQL.statement () stmt
stmt = SQL.Statement "SELECT json_agg(row_to_json(t.*)) FROM test t" SQL.noParams singleJsonDecoder True
singleJsonDecoder = Decode.singleRow $ Decode.column $ Decode.nonNullable Decode.json

writeMessage m = atomically $ writeTQueue msgQ $ toMsg m
1 change: 1 addition & 0 deletions src/PostgresWebsockets/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ serve conf@AppConfig {..} = do
let shutdown = putErrLn ("Broadcaster connection is dead" :: Text) >> putMVar shutdownSignal ()
ctx <- mkContext conf shutdown

putStrLn "Listening to changes..."
let waitForShutdown cl = void $ forkIO (takeMVar shutdownSignal >> cl)
appSettings = warpSettings waitForShutdown conf
app = postgresWsMiddleware ctx $ logStdout $ maybe dummyApp staticApp' configPath
Expand Down
Loading