New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
s3 sources: Add the ability to read a single object from S3 #5194
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exciting!
bucket: String, | ||
/// A glob-like pattern that objects must match | ||
objects_pattern: String, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, we're trying to move away from these sorts of comments in the SQL parser. The principle is that descriptions of the semantics of S3 sources don't really belong in doc comments in the SQL parser. If anything these could describe the syntax they represent ("The value of the BUCKET
clause of the S3
connector."), but in general I think those comments clutter things up more than they help. The syn
crate is a nice example of describing just an AST without describing the semantics of the language.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would you feel about:
A glob-like object specifier: `'a/**/*.json'`
This seems to match what happens in syn, in e.g. Item: https://docs.rs/syn/1.0.57/syn/enum.Item.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think that's a bit too specific since the syntax of that pattern is opaque to the SQL parser. Like I think the correct level of detail for these doc comments is something like
The `S3 ...` connector.
S3 {
/// The `BUCKET 'bucket'` clause.
bucket: String,
/// The `OBJECTS 'pattern'` clause.
object_patterns: String,
}
And then the semantics of these things would be fully-described in the user-facing docs, so that we focus our documentation efforts on what users will actually see. But again this is super minor so 💯 down to roll with what you have!
src/dataflow-types/src/types.rs
Outdated
} | ||
|
||
impl ExternalSourceConnector { | ||
/// Metadata columns reflect how many records we have processed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're going to document this method, then I think it would be great to be a bit more detailed! Perhaps something like this:
/// Returns the name and type of each additional metadata column that
/// Materialize will automatically append to the source's inherent columns.
///
/// Presently, each source type exposes precisely one metadata column that
/// corresponds to some source-specific record counter. For example, file
/// sources use a line number, while Kafka sources use a topic offset.
///
/// The columns declared here must be kept in sync with the actual source
/// implementations that produce these columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this is exactly what I had wished was there when I was trying to read this.
src/dataflow-types/src/types.rs
Outdated
pub bucket: String, | ||
/// Used to filter results | ||
#[serde(with = "s3_serde_glob")] | ||
pub objects_pattern: glob::Pattern, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you say to using BurntSushi's globset
crate instead? It's both supposed to be faster and comes with built-in serde support if you enable the serde1
feature!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I looked for it but could only find a crate that actually did stuff on the filesystem, will switch.
} | ||
|
||
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] | ||
pub struct AwsConnectInfo { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this might as well be named AwsConnector
for parity with the other types in this module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is slightly different to a connector, though, since it's a subset of the information required by connectors, it's not enough info by itself to create a connector, and I felt like that difference was worth preserving. I agree that AwsConnectInfo
is a bad name, but I couldn't think of anything better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have to admit I'm unconvinced! The term "connector" was never meant to mean "complete description"; it was picked specifically to avoid the noise word "info".
And indeed an ExternalSourceConnector
is not actually a complete of what you need to create an external source—there are a bunch of fields in SourceConnector::External
.
Anyway, minor, let's definitely not hold up this PR on this point!
src/sql/src/plan/statement.rs
Outdated
None => extract("region")? | ||
.map(|r| r.parse()) | ||
.transpose()? | ||
// TODO: do we want to have a default region? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my past experiences with AWS, I've always regretted any attempt to automatically infer the region or use a default. It's ugly for sure, but folks who are used to AWS seem very used to the idea that an AWS connection is a verbose affair that requires all three of (access key id, secret access key, region).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair, removed this todo.
src/dataflow/src/source/s3.rs
Outdated
// Helper utilities | ||
|
||
/// Iterate over a `Vec<u8>`, yielding new Vecs newline-separated | ||
struct VecLinesIter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you not use something like slice::split('\n').map(|s| s.to_vec())
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gah, yes.
b2dd11f
to
f69e626
Compare
@benesch I believe that this is entirely up to date re: your comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Everything I've looked at looks great, but I didn't look at the important bits—I think @umanwizard is doing that in the other PR?
} | ||
|
||
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] | ||
pub struct AwsConnectInfo { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have to admit I'm unconvinced! The term "connector" was never meant to mean "complete description"; it was picked specifically to avoid the noise word "info".
And indeed an ExternalSourceConnector
is not actually a complete of what you need to create an external source—there are a bunch of fields in SourceConnector::External
.
Anyway, minor, let's definitely not hold up this PR on this point!
bucket: String, | ||
/// A glob-like pattern that objects must match | ||
objects_pattern: String, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think that's a bit too specific since the syntax of that pattern is opaque to the SQL parser. Like I think the correct level of detail for these doc comments is something like
The `S3 ...` connector.
S3 {
/// The `BUCKET 'bucket'` clause.
bucket: String,
/// The `OBJECTS 'pattern'` clause.
object_patterns: String,
}
And then the semantics of these things would be fully-described in the user-facing docs, so that we focus our documentation efforts on what users will actually see. But again this is super minor so 💯 down to roll with what you have!
c8fc7f9
to
906997b
Compare
This will allow it to be shared between different services in later commits.
This is a more common idiom.
Before this credentials weren't fetched until we tried to use the client, causing credentials errors to show up well after the dataflow had been created. With this we can fail to create the source at all.
Individual objects are downloaded, and then split on newlines with individual records being sent through to the dataflow layer. There are some open questions about how MzOffsets and other things should map into an S3 world -- "partition" can be mapped to S3 objects, but we can grow to arbitrarily large numbers of S3 objects, so a naive implementation of that won't necessarily make sense. This commit still requires testing infrastructure, but ingesting an S3 object works correctly.
906997b
to
7117745
Compare
I'm going to close this to centralize discussion on #5202, which includes this commit. |
Individual objects are downloaded, and then split on newlines with individual
records being sent through to the dataflow layer.
There are some open questions about how MzOffsets and other things should map
into an S3 world -- "partition" can be mapped to S3 objects, but we can grow to
arbitrarily large numbers of S3 objects, so a naive implementation of that
won't necessarily make sense.
This PR still requires testing infrastructure, but ingesting an S3 object
works correctly. This should be reviewed commit-by-commit.
Part of #4914
This change is