From 6aa325a49fc684eff219229469acf7044d53d7fb Mon Sep 17 00:00:00 2001 From: Jesse Date: Wed, 23 Aug 2023 09:44:45 +0200 Subject: [PATCH] Clean up postgres snapshotter (#1895) --- .../src/connectors/postgres/snapshotter.rs | 55 +++++++++---------- 1 file changed, 26 insertions(+), 29 deletions(-) diff --git a/dozer-ingestion/src/connectors/postgres/snapshotter.rs b/dozer-ingestion/src/connectors/postgres/snapshotter.rs index 837cacc15a..485f819fd3 100644 --- a/dozer-ingestion/src/connectors/postgres/snapshotter.rs +++ b/dozer-ingestion/src/connectors/postgres/snapshotter.rs @@ -4,8 +4,8 @@ use crate::ingestion::Ingestor; use super::helper; use crate::connectors::postgres::connection::helper as connection_helper; use crate::errors::ConnectorError; +use crate::errors::PostgresConnectorError::SyncWithSnapshotError; use crate::errors::PostgresConnectorError::{InvalidQueryError, PostgresSchemaError}; -use crate::errors::PostgresConnectorError::{SnapshotReadError, SyncWithSnapshotError}; use crate::connectors::postgres::schema::helper::SchemaHelper; use crate::errors::ConnectorError::PostgresConnectorError; @@ -16,6 +16,7 @@ use dozer_types::ingestion_types::IngestionMessage; use dozer_types::types::Operation; use futures::StreamExt; use tokio::sync::mpsc::{channel, Sender}; +use tokio::task::JoinSet; pub struct PostgresSnapshotter<'a> { pub conn_config: tokio_postgres::Config, @@ -40,7 +41,7 @@ impl<'a> PostgresSnapshotter<'a> { table_name: String, table_index: usize, conn_config: tokio_postgres::Config, - sender: Sender, ConnectorError>>, + sender: Sender>, ) -> Result<(), ConnectorError> { let client_plain = connection_helper::connect(conn_config) .await @@ -72,24 +73,26 @@ impl<'a> PostgresSnapshotter<'a> { let evt = helper::map_row_to_operation_event(&msg, columns) .map_err(|e| PostgresConnectorError(PostgresSchemaError(e)))?; - sender.send(Ok(Some((table_index, evt)))).await.unwrap(); + let Ok(_) = sender.send(Ok((table_index, evt))).await else { + // If we can't send, the parent task has quit. There is + // no use in going on, but if there was an error, it was + // handled by the parent. + return Ok(()); + }; } Err(e) => return Err(PostgresConnectorError(SyncWithSnapshotError(e.to_string()))), } } - // After table read is finished, send None as message to inform receiver loop about end of table - sender.send(Ok(None)).await.unwrap(); Ok(()) } pub async fn sync_tables(&self, tables: &[ListOrFilterColumns]) -> Result<(), ConnectorError> { let schemas = self.get_tables(tables).await?; - let mut left_tables_count = tables.len(); - let (tx, mut rx) = channel(16); + let mut joinset = JoinSet::new(); for (table_index, (schema, table)) in schemas.into_iter().zip(tables).enumerate() { let schema = schema?; let schema = schema.schema; @@ -97,7 +100,7 @@ impl<'a> PostgresSnapshotter<'a> { let table_name = table.name.clone(); let conn_config = self.conn_config.clone(); let sender = tx.clone(); - tokio::spawn(async move { + joinset.spawn(async move { if let Err(e) = Self::sync_table( schema, schema_name, @@ -112,36 +115,30 @@ impl<'a> PostgresSnapshotter<'a> { } }); } + // Make sure the last sender is dropped so receiving on the channel doesn't + // deadlock + drop(tx); + let mut idx = 0; self.ingestor - .handle_message(IngestionMessage::new_snapshotting_started(0_u64, 0)) + .handle_message(IngestionMessage::new_snapshotting_started(idx, 0)) .map_err(ConnectorError::IngestorError)?; - let mut idx = 1; - loop { - let message = rx - .recv() - .await - .ok_or(PostgresConnectorError(SnapshotReadError))??; - match message { - None => { - left_tables_count -= 1; - if left_tables_count == 0 { - break; - } - } - Some((table_index, evt)) => { - self.ingestor - .handle_message(IngestionMessage::new_op(0, idx, table_index, evt)) - .map_err(ConnectorError::IngestorError)?; - idx += 1; - } - } + idx += 1; + + while let Some(message) = rx.recv().await { + let (table_index, evt) = message?; + self.ingestor + .handle_message(IngestionMessage::new_op(0, idx, table_index, evt)) + .map_err(ConnectorError::IngestorError)?; + idx += 1; } self.ingestor .handle_message(IngestionMessage::new_snapshotting_done(0_u64, idx)) .map_err(ConnectorError::IngestorError)?; + // All tasks in the joinset should have finished (because they have dropped their senders) + // Otherwise, they will be aborted when the joinset is dropped Ok(()) } }