Skip to content

Commit

Permalink
Clean up postgres snapshotter (#1895)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesse-Bakker committed Aug 23, 2023
1 parent d3d4152 commit 6aa325a
Showing 1 changed file with 26 additions and 29 deletions.
55 changes: 26 additions & 29 deletions dozer-ingestion/src/connectors/postgres/snapshotter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use crate::ingestion::Ingestor;
use super::helper;
use crate::connectors::postgres::connection::helper as connection_helper;
use crate::errors::ConnectorError;
use crate::errors::PostgresConnectorError::SyncWithSnapshotError;
use crate::errors::PostgresConnectorError::{InvalidQueryError, PostgresSchemaError};
use crate::errors::PostgresConnectorError::{SnapshotReadError, SyncWithSnapshotError};

use crate::connectors::postgres::schema::helper::SchemaHelper;
use crate::errors::ConnectorError::PostgresConnectorError;
Expand All @@ -16,6 +16,7 @@ use dozer_types::ingestion_types::IngestionMessage;
use dozer_types::types::Operation;
use futures::StreamExt;
use tokio::sync::mpsc::{channel, Sender};
use tokio::task::JoinSet;

pub struct PostgresSnapshotter<'a> {
pub conn_config: tokio_postgres::Config,
Expand All @@ -40,7 +41,7 @@ impl<'a> PostgresSnapshotter<'a> {
table_name: String,
table_index: usize,
conn_config: tokio_postgres::Config,
sender: Sender<Result<Option<(usize, Operation)>, ConnectorError>>,
sender: Sender<Result<(usize, Operation), ConnectorError>>,
) -> Result<(), ConnectorError> {
let client_plain = connection_helper::connect(conn_config)
.await
Expand Down Expand Up @@ -72,32 +73,34 @@ impl<'a> PostgresSnapshotter<'a> {
let evt = helper::map_row_to_operation_event(&msg, columns)
.map_err(|e| PostgresConnectorError(PostgresSchemaError(e)))?;

sender.send(Ok(Some((table_index, evt)))).await.unwrap();
let Ok(_) = sender.send(Ok((table_index, evt))).await else {
// If we can't send, the parent task has quit. There is
// no use in going on, but if there was an error, it was
// handled by the parent.
return Ok(());
};
}
Err(e) => return Err(PostgresConnectorError(SyncWithSnapshotError(e.to_string()))),
}
}

// After table read is finished, send None as message to inform receiver loop about end of table
sender.send(Ok(None)).await.unwrap();
Ok(())
}

pub async fn sync_tables(&self, tables: &[ListOrFilterColumns]) -> Result<(), ConnectorError> {
let schemas = self.get_tables(tables).await?;

let mut left_tables_count = tables.len();

let (tx, mut rx) = channel(16);

let mut joinset = JoinSet::new();
for (table_index, (schema, table)) in schemas.into_iter().zip(tables).enumerate() {
let schema = schema?;
let schema = schema.schema;
let schema_name = table.schema.clone().unwrap_or("public".to_string());
let table_name = table.name.clone();
let conn_config = self.conn_config.clone();
let sender = tx.clone();
tokio::spawn(async move {
joinset.spawn(async move {
if let Err(e) = Self::sync_table(
schema,
schema_name,
Expand All @@ -112,36 +115,30 @@ impl<'a> PostgresSnapshotter<'a> {
}
});
}
// Make sure the last sender is dropped so receiving on the channel doesn't
// deadlock
drop(tx);

let mut idx = 0;
self.ingestor
.handle_message(IngestionMessage::new_snapshotting_started(0_u64, 0))
.handle_message(IngestionMessage::new_snapshotting_started(idx, 0))
.map_err(ConnectorError::IngestorError)?;
let mut idx = 1;
loop {
let message = rx
.recv()
.await
.ok_or(PostgresConnectorError(SnapshotReadError))??;
match message {
None => {
left_tables_count -= 1;
if left_tables_count == 0 {
break;
}
}
Some((table_index, evt)) => {
self.ingestor
.handle_message(IngestionMessage::new_op(0, idx, table_index, evt))
.map_err(ConnectorError::IngestorError)?;
idx += 1;
}
}
idx += 1;

while let Some(message) = rx.recv().await {
let (table_index, evt) = message?;
self.ingestor
.handle_message(IngestionMessage::new_op(0, idx, table_index, evt))
.map_err(ConnectorError::IngestorError)?;
idx += 1;
}

self.ingestor
.handle_message(IngestionMessage::new_snapshotting_done(0_u64, idx))
.map_err(ConnectorError::IngestorError)?;

// All tasks in the joinset should have finished (because they have dropped their senders)
// Otherwise, they will be aborted when the joinset is dropped
Ok(())
}
}
Expand Down

0 comments on commit 6aa325a

Please sign in to comment.