Skip to content

Commit

Permalink
move postgres SimpleSource to SourceReader
Browse files Browse the repository at this point in the history
  • Loading branch information
guswynn committed Apr 27, 2022
1 parent 41612ad commit 79f16a5
Show file tree
Hide file tree
Showing 4 changed files with 521 additions and 312 deletions.
39 changes: 22 additions & 17 deletions src/storage/src/render/sources.rs
Expand Up @@ -43,11 +43,13 @@ use mz_timely_util::operator::{CollectionExt, StreamExt};
///
/// This enum puts no restrictions to the generic parameters of the variants since it only serves
/// as a type-level enum.
enum SourceType<Delimited, ByteStream> {
enum SourceType<Delimited, ByteStream, RowSource> {
/// A delimited source
Delimited(Delimited),
/// A bytestream source
ByteStream(ByteStream),
/// A source that produces Row's natively
Row(RowSource),
}

/// A description of a table imported by [`render_table`].
Expand Down Expand Up @@ -280,21 +282,6 @@ where
.as_collection(),
);

(ok_stream.as_collection(), capability)
} else if let ExternalSourceConnector::Postgres(pg_connector) = connector {
let source =
PostgresSourceReader::new(uid, pg_connector, source_config.base_metrics);

let ((ok_stream, err_stream), capability) =
source::create_source_simple(source_config, source);

error_collections.push(
err_stream
.map(DataflowError::SourceError)
.pass_through("source-errors", 1)
.as_collection(),
);

(ok_stream.as_collection(), capability)
} else if let ExternalSourceConnector::Persist(_) = connector {
unreachable!("persist/STORAGE sources cannot be rendered in a storage instance")
Expand Down Expand Up @@ -335,7 +322,14 @@ where
);
((SourceType::ByteStream(ok), err), cap)
}
ExternalSourceConnector::Postgres(_) => unreachable!(),
ExternalSourceConnector::Postgres(_) => {
let ((ok, err), cap) = source::create_source::<_, PostgresSourceReader>(
source_config,
&connector,
storage_state.aws_external_id.clone(),
);
((SourceType::Row(ok), err), cap)
}
ExternalSourceConnector::PubNub(_) => unreachable!(),
ExternalSourceConnector::Persist(_) => unreachable!(),
};
Expand Down Expand Up @@ -398,6 +392,17 @@ where
&mut linear_operators,
storage_state.decode_metrics.clone(),
),
SourceType::Row(source) => (
source.map(|r| DecodeResult {
key: None,
value: Some(Ok((r.value, r.diff))),
position: r.position,
upstream_time_millis: r.upstream_time_millis,
partition: r.partition,
metadata: Row::default(),
}),
None,
),
};
if let Some(tok) = extra_token {
needed_tokens.push(Arc::new(tok));
Expand Down
10 changes: 10 additions & 0 deletions src/storage/src/source/mod.rs
Expand Up @@ -361,6 +361,16 @@ impl MaybeLength for Vec<u8> {
}
}

impl MaybeLength for mz_repr::Row {
fn len(&self) -> Option<usize> {
Some(self.data().len())
}

fn is_empty(&self) -> bool {
self.data().is_empty()
}
}

impl MaybeLength for Value {
// Not possible to compute a size in bytes without recursively traversing the entire tree.
fn len(&self) -> Option<usize> {
Expand Down

0 comments on commit 79f16a5

Please sign in to comment.