The core row-processing loop (iterate columns, extract values, build JSON payload, construct ProducedMessage) is duplicated in three places within postgres_source/src/lib.rs (2,394 lines):
process_rows (sequential polling path)
- Inline closure inside
poll_tables_parallel (non-chunked parallel path)
poll_table_chunked (chunked parallel path)
Every new feature (like flat_json_output) must be wired into all three. Every bug fix must be applied three times. During the benchmark, adding flat JSON output required editing three separate blocks of nearly identical code.
Fix: Extract the row-to-message conversion into a single shared function that all three paths call. The function signature would be roughly:
fn rows_to_messages(rows, table_name, config_flags, ...) -> Result<Vec<ProducedMessage>, Error>
The core row-processing loop (iterate columns, extract values, build JSON payload, construct
ProducedMessage) is duplicated in three places withinpostgres_source/src/lib.rs(2,394 lines):process_rows(sequential polling path)poll_tables_parallel(non-chunked parallel path)poll_table_chunked(chunked parallel path)Every new feature (like
flat_json_output) must be wired into all three. Every bug fix must be applied three times. During the benchmark, adding flat JSON output required editing three separate blocks of nearly identical code.Fix: Extract the row-to-message conversion into a single shared function that all three paths call. The function signature would be roughly:
fn rows_to_messages(rows, table_name, config_flags, ...) -> Result<Vec<ProducedMessage>, Error>