Skip to content

Commit

Permalink
Fixed updateRecord calls in DataSync failing when the patch size is m…
Browse files Browse the repository at this point in the history
…ore than 8000 bytes
  • Loading branch information
mpscholten committed Jan 28, 2023
1 parent e04b365 commit 694ff87
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 17 deletions.
78 changes: 63 additions & 15 deletions IHP/DataSync/ChangeNotifications.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ module IHP.DataSync.ChangeNotifications
( channelName
, ChangeNotification (..)
, Change (..)
, ChangeSet (..)
, createNotificationFunction
, installTableChangeTriggers
, makeCachedInstallTableChangeTriggers
, retrieveChanges
) where

import IHP.Prelude
Expand All @@ -21,13 +23,20 @@ import qualified IHP.DataSync.RowLevelSecurity as RLS
import qualified IHP.PGListener as PGListener
import Data.Set (Set)
import qualified Data.Set as Set
import qualified Data.UUID as UUID

data ChangeNotification
= DidInsert { id :: !UUID }
| DidUpdate { id :: !UUID, changeSet :: ![Change] }
| DidUpdate { id :: !UUID, changeSet :: !ChangeSet }
| DidUpdateLarge { id :: !UUID, payloadId :: !UUID }
| DidDelete { id :: !UUID }
deriving (Eq, Show)

data ChangeSet
= InlineChangeSet { changeSet :: ![Change] } -- | When the patch fits into the 8000 bytes limit of @pg_notify@
| ExternalChangeSet { largePgNotificationId :: !UUID } -- | The patch is over 8000 bytes, so we have stored it in the @large_pg_notifications@ table
deriving (Eq, Show)

data Change = Change
{ col :: !Text
, new :: !Value
Expand All @@ -39,23 +48,35 @@ createNotificationFunction table = [i|
DO $$
BEGIN
CREATE FUNCTION #{functionName}() RETURNS TRIGGER AS $BODY$
DECLARE
payload TEXT;
large_pg_notification_id UUID;
changeset JSON;
BEGIN
CASE TG_OP
WHEN 'UPDATE' THEN
SELECT json_agg(row_to_json(t))
FROM (
SELECT pre.key AS "col", post.value AS "new"
FROM jsonb_each(to_jsonb(OLD)) AS pre
CROSS JOIN jsonb_each(to_jsonb(NEW)) AS post
WHERE pre.key = post.key AND pre.value IS DISTINCT FROM post.value
) t INTO changeset;
payload := json_build_object(
'UPDATE', NEW.id::text,
'CHANGESET', changeset
)::text;
IF LENGTH(payload) > 7800 THEN
INSERT INTO large_pg_notifications (payload) VALUES (changeset) RETURNING id INTO large_pg_notification_id;
payload := json_build_object(
'UPDATE', NEW.id::text,
'CHANGESET', large_pg_notification_id::text
)::text;
DELETE FROM large_pg_notifications WHERE created_at < CURRENT_TIMESTAMP - interval '30s';
END IF;
PERFORM pg_notify(
'#{channelName table}',
json_build_object(
'UPDATE', NEW.id::text,
'CHANGESET', (
SELECT json_agg(row_to_json(t))
FROM (
SELECT pre.key AS "col", post.value AS "new"
FROM jsonb_each(to_jsonb(OLD)) AS pre
CROSS JOIN jsonb_each(to_jsonb(NEW)) AS post
WHERE pre.key = post.key AND pre.value IS DISTINCT FROM post.value
) t
)
)::text
payload
);
WHEN 'DELETE' THEN
PERFORM pg_notify(
Expand All @@ -82,7 +103,13 @@ createNotificationFunction table = [i|
EXCEPTION
WHEN duplicate_function THEN
null;


CREATE UNLOGGED TABLE IF NOT EXISTS large_pg_notifications (
id UUID DEFAULT uuid_generate_v4() PRIMARY KEY NOT NULL,
payload TEXT DEFAULT null,
created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL
);
CREATE INDEX IF NOT EXISTS large_pg_notifications_created_at_index ON large_pg_notifications (created_at);
END; $$
|]

Expand Down Expand Up @@ -126,12 +153,33 @@ instance FromJSON ChangeNotification where
pure $ DidUpdate id changeSet
delete values = DidDelete <$> values .: "DELETE"

instance FromJSON ChangeSet where
parseJSON array@(Array v) = do
changeSet <- parseJSON array
pure InlineChangeSet { changeSet }
parseJSON (String id) = do
case UUID.fromText id of
Just largePgNotificationId -> pure ExternalChangeSet { largePgNotificationId }
Nothing -> fail "Invalid UUID"

instance FromJSON Change where
parseJSON = withObject "Change" $ \values -> do
col <- values .: "col"
new <- values .: "new"
pure Change { .. }
-- | The @pg_notify@ function has a payload limit of 8000 bytes. When a record update is larger than the payload size
-- we store the patch in the @large_pg_notifications@ table and pass over the id to the patch.
--
-- This function retrieves the patch from the @large_pg_notifications@ table, or directly returns the patch
-- when it's less than 8000 bytes.
retrieveChanges :: (?modelContext :: ModelContext) => ChangeSet -> IO [Change]
retrieveChanges InlineChangeSet { changeSet } = pure changeSet
retrieveChanges ExternalChangeSet { largePgNotificationId } = do
(payload :: ByteString) <- sqlQueryScalar "SELECT payload FROM large_pg_notifications WHERE id = ? LIMIT 1" (PG.Only largePgNotificationId)
case eitherDecodeStrict' payload of
Left e -> fail e
Right result -> pure result

$(deriveToJSON defaultOptions 'Change)
$(deriveToJSON defaultOptions 'DidInsert)
$(deriveToJSON defaultOptions 'InlineChangeSet)
$(deriveToJSON defaultOptions 'DidInsert)
5 changes: 3 additions & 2 deletions IHP/DataSync/ControllerImpl.hs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,9 @@ buildMessageHandler ensureRLSEnabled installTableChangeTriggers sendJSON handleC
-- E.g. if it's not matched anymore by the WHERE condition after the update
[(PG.Only isRecordInResultSet)] <- sqlQueryWithRLS ("SELECT EXISTS(SELECT * FROM (" <> theQuery <> ") AS records WHERE records.id = ? LIMIT 1)") (theParams <> [PG.toField id])
changes <- ChangeNotifications.retrieveChanges changeSet
if isRecordInResultSet
then sendJSON DidUpdate { subscriptionId, id, changeSet = changesToValue changeSet }
then sendJSON DidUpdate { subscriptionId, id, changeSet = changesToValue changes }
else sendJSON DidDelete { subscriptionId, id }
ChangeNotifications.DidDelete { id } -> do
-- Only send the notifcation if the deleted record was part of the initial
Expand Down Expand Up @@ -477,4 +478,4 @@ instance SetField "transactions" DataSyncController (HashMap UUID DataSyncTransa
setField transactions record = record { transactions }
instance SetField "asyncs" DataSyncController [Async ()] where
setField asyncs record = record { asyncs }
setField asyncs record = record { asyncs }

0 comments on commit 694ff87

Please sign in to comment.