Skip to content

Handling Unchanged TOAST Columns as a part MIRROR for CDC #111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 49 commits into from
Jun 13, 2023
Merged

Conversation

saisrirampur
Copy link
Contributor

@saisrirampur saisrirampur commented Jun 7, 2023

TOAST Storage:

Postgres utilizes TOAST storage for large column values (>8KB) instead of storing them directly in the table's heap (data pages). TOAST breaks down the value into multiple pieces and stores them separately, providing compression by default for optimization. The end-user is not aware of this internal storage mechanism. Additional information about TOAST can be found here.

Problem:

In Postgres, the logical decoding/replication feature is used by MIRROR for CDC. Logical decoding provides the operation type (INSERT, DELETE, UPDATE) and all column values for each row/tuple. However, if a column value is a TOAST and remains unchanged during a data manipulation language (DML) operation, Postgres does not provide the actual column value. Instead, it offers a pointer that refers to the pg_catalog.* table/chunk. It becomes the responsibility of the client to fetch the value based on this pointer. This poses a challenge when normalizing raw changes to the final (normalized) table, as TOAST columns can be nullified.

Solution:

  1. While reading the logical replication slot from Postgres, the unchanged columns per tuple are set using the last seen column value in that batch. This step is performed by PullRecords.
  2. If the column value is not available in that batch, it is marked and added to the unchanged toast column array, which is then propagated to the raw table on the sink. Constructing the unchanged toast column array is done in PullRecords and propagating the array to the raw table to the sink is done in SyncRecords.
  3. If the last seen column of a row is unchanged, the column value is not updated. It remains the same as before the batch was normalized. To achieve this, the UPDATE statements as part of the MERGE command in NormalizeRecords are generated in a way that ensures unchanged columns are not updated.

Step 1 can be a significant optimization for workloads that frequently insert/update large values of the same row. This is particularly applicable in IoT/NoSQL-like workloads. PeerDB controls the population of unchanged column values per table based on the largest value seen in that batch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert this file

@PeerDB-io PeerDB-io deleted a comment from iskakaushik Jun 12, 2023
@@ -388,3 +367,1097 @@ func (s *E2EPeerFlowTestSuite) Test_Complete_Simple_Flow() {

env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Toast_BQ() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
371-440 lines are duplicate of e2e/peer_flow_test.go:442-505 (dupl)

env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Toast_Nochanges_BQ() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
442-505 lines are duplicate of e2e/peer_flow_test.go:507-581 (dupl)

env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Toast_Advance_1_BQ() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
507-581 lines are duplicate of e2e/peer_flow_test.go:610-678 (dupl)

}
}

func (s *E2EPeerFlowTestSuite) Test_Toast_Advance_2_BQ() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
610-678 lines are duplicate of e2e/peer_flow_test.go:680-748 (dupl)

env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Toast_Advance_3_BQ() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
680-748 lines are duplicate of e2e/peer_flow_test.go:371-440 (dupl)

env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Toast_Advance_1_SF() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
1100-1175 lines are duplicate of e2e/peer_flow_test.go:1177-1246 (dupl)

env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Toast_Advance_2_SF() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
1177-1246 lines are duplicate of e2e/peer_flow_test.go:1248-1317 (dupl)

env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Toast_Advance_3_SF() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
1248-1317 lines are duplicate of e2e/peer_flow_test.go:962-1032 (dupl)

@@ -388,3 +370,1097 @@ func (s *E2EPeerFlowTestSuite) Test_Complete_Simple_Flow() {

env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Toast_BQ() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
374-443 lines are duplicate of e2e/peer_flow_test.go:445-508 (dupl)

env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Toast_Nochanges_BQ() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
445-508 lines are duplicate of e2e/peer_flow_test.go:510-584 (dupl)

env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Toast_Advance_1_BQ() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
510-584 lines are duplicate of e2e/peer_flow_test.go:613-681 (dupl)

}
}

func (s *E2EPeerFlowTestSuite) Test_Toast_Advance_2_BQ() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
613-681 lines are duplicate of e2e/peer_flow_test.go:683-751 (dupl)

env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Toast_Advance_3_BQ() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
683-751 lines are duplicate of e2e/peer_flow_test.go:374-443 (dupl)

env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Toast_Nochanges_SF() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
1037-1320 lines are duplicate of e2e/peer_flow_test.go:965-1249 (dupl)

env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Toast_Advance_1_SF() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
1103-1178 lines are duplicate of e2e/peer_flow_test.go:1180-1249 (dupl)

env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Toast_Advance_2_SF() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
1180-1249 lines are duplicate of e2e/peer_flow_test.go:1251-1320 (dupl)

env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Toast_Advance_3_SF() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
1251-1320 lines are duplicate of e2e/peer_flow_test.go:965-1035 (dupl)

s.sfHelper = sfHelper

// for every test, drop the _PEERDB_INTERNAL schema
s.sfHelper.client.DropSchema("_PEERDB_INTERNAL")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
Error return value of s.sfHelper.client.DropSchema is not checked (errcheck)

@saisrirampur saisrirampur requested a review from iskakaushik June 13, 2023 19:00
matchData: "",
batchID: syncBatchID,
stagingBatchID: stagingBatchID,
unchangedToastColumns: utils.KeysToString(r.UnchangedToastColumns),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
undefined: utils.KeysToString

matchData: string(oldItemsJSON),
batchID: syncBatchID,
stagingBatchID: stagingBatchID,
unchangedToastColumns: utils.KeysToString(r.UnchangedToastColumns),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
undefined: utils.KeysToString

matchData: string(itemsJSON),
batchID: syncBatchID,
stagingBatchID: stagingBatchID,
unchangedToastColumns: utils.KeysToString(r.UnchangedToastColumns),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
undefined: utils.KeysToString) (typecheck)

matchData: "",
batchID: syncBatchID,
items: typedRecord.Items,
unchangedToastColumns: utils.KeysToString(typedRecord.UnchangedToastColumns),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
undefined: utils.KeysToString

matchData: string(oldItemsJSON),
batchID: syncBatchID,
items: typedRecord.NewItems,
unchangedToastColumns: utils.KeysToString(typedRecord.UnchangedToastColumns),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
undefined: utils.KeysToString

matchData: string(oldItemsJSON),
batchID: syncBatchID,
stagingBatchID: stagingBatchID,
unchangedToastColumns: utils.KeysToString(r.UnchangedToastColumns),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
undefined: utils.KeysToString

matchData: string(itemsJSON),
batchID: syncBatchID,
stagingBatchID: stagingBatchID,
unchangedToastColumns: utils.KeysToString(r.UnchangedToastColumns),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
undefined: utils.KeysToString) (typecheck)

matchData: "",
batchID: syncBatchID,
items: typedRecord.Items,
unchangedToastColumns: utils.KeysToString(typedRecord.UnchangedToastColumns),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
undefined: utils.KeysToString

matchData: string(oldItemsJSON),
batchID: syncBatchID,
items: typedRecord.NewItems,
unchangedToastColumns: utils.KeysToString(typedRecord.UnchangedToastColumns),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
undefined: utils.KeysToString

matchData: string(itemsJSON),
batchID: syncBatchID,
items: typedRecord.Items,
unchangedToastColumns: utils.KeysToString(typedRecord.UnchangedToastColumns),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
undefined: utils.KeysToString) (typecheck)

@@ -388,3 +381,1097 @@ func (s *E2EPeerFlowTestSuite) Test_Complete_Simple_Flow() {

env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Toast_BQ() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
385-454 lines are duplicate of e2e/peer_flow_test.go:456-519 (dupl)

env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Toast_Nochanges_BQ() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
456-519 lines are duplicate of e2e/peer_flow_test.go:521-595 (dupl)

env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Toast_Advance_1_BQ() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
521-595 lines are duplicate of e2e/peer_flow_test.go:624-692 (dupl)

}
}

func (s *E2EPeerFlowTestSuite) Test_Toast_Advance_2_BQ() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
624-692 lines are duplicate of e2e/peer_flow_test.go:694-762 (dupl)

env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Toast_Advance_3_BQ() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
694-762 lines are duplicate of e2e/peer_flow_test.go:385-454 (dupl)

env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Toast_SF() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
976-1260 lines are duplicate of e2e/peer_flow_test.go:1048-1331 (dupl)

env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Toast_Nochanges_SF() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
1048-1331 lines are duplicate of e2e/peer_flow_test.go:976-1260 (dupl)

env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Toast_Advance_1_SF() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
1114-1189 lines are duplicate of e2e/peer_flow_test.go:1191-1260 (dupl)

env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Toast_Advance_2_SF() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
1191-1260 lines are duplicate of e2e/peer_flow_test.go:1262-1331 (dupl)

env.AssertExpectations(s.T())
}

func (s *E2EPeerFlowTestSuite) Test_Toast_Advance_3_SF() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
1262-1331 lines are duplicate of e2e/peer_flow_test.go:976-1046 (dupl)

@iskakaushik iskakaushik enabled auto-merge (squash) June 13, 2023 21:50
@iskakaushik iskakaushik merged commit adfcaf1 into main Jun 13, 2023
@serprex serprex deleted the toast-new branch December 25, 2023 21:07
@unoexperto
Copy link

@iskakaushik @saisrirampur Hi folks. Sorry for reviving old thread but I'm curious about something. Unfortunately I'm not familiar with Go so it's hard for me to parse 15 changed files.

So let's say old value of TOAST column is not available in the current batch and you're parsing WAL stream that looks like this

....
....
UPDATE TableA set toast_col = toast_value_1
UPDATE TableA set toast_col = toast_value_2

Let's say you're processing UPDATE TableA set toast_col = toast_value_1 right now and you see that you need to backfill toast_col. Will you do it by simply select toast_col from TableA where id = ... ? But in this case your select would return value which is already updated by second update that you haven't processed yet.

Do I understand it correctly ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants