Skip to content

Commit

Permalink
Output partial schema and record when TOAST columns are missing
Browse files Browse the repository at this point in the history
  • Loading branch information
mvgijssel committed Jan 13, 2022
1 parent a073f5c commit acd0b0f
Showing 1 changed file with 39 additions and 2 deletions.
41 changes: 39 additions & 2 deletions tap_postgres/sync_strategies/logical_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,45 @@ def consume_message(streams, state, msg, time_extracted, conn_info):
stream_md_map,
conn_info)

singer.write_message(record_message)
state = singer.write_bookmark(state, target_stream['tap_stream_id'], 'lsn', lsn)
toast_columns = set()

if payload["kind"] in {"update"}:
desired_columns_without_automatic_columns = [
column for column in desired_columns if not column.startswith("_sdc")
]

toast_columns = set(desired_columns_without_automatic_columns).difference(
payload["columnnames"]
)

if toast_columns:
LOGGER.info(
"Found toast columns %s for stream %s",
toast_columns,
target_stream["tap_stream_id"],
)

LOGGER.info("Original stream: %s", json.dumps(target_stream))

modified_target_stream = copy.deepcopy(target_stream)

for col in toast_columns:
modified_target_stream["schema"]["properties"].pop(col)

LOGGER.info("Modified stream: %s", json.dumps(modified_target_stream))

sync_common.send_schema_message(modified_target_stream, ["lsn"])

singer.write_message(record_message)

# Reset the schema to the original one to avoid subsequent records missing columns
sync_common.send_schema_message(target_stream, ["lsn"])
else:
singer.write_message(record_message)

state = singer.write_bookmark(
state, target_stream["tap_stream_id"], "lsn", lsn
)

return state

Expand Down

0 comments on commit acd0b0f

Please sign in to comment.