Skip to content

Commit

Permalink
s3 sources: Add the ability to read a single object from S3
Browse files Browse the repository at this point in the history
Individual objects are downloaded, and then split on newlines with individual
records being sent through to the dataflow layer.

There are some open questions about how MzOffsets and other things should map
into an S3 world -- "partition" can be mapped to S3 objects, but we can grow to
arbitrarily large numbers of S3 objects, so a naive implementation of that
won't necessarily make sense.

This commit still requires testing infrastructure, but ingesting an S3 object
works correctly.

Part of #4914
  • Loading branch information
quodlibetor committed Jan 5, 2021
1 parent 7ba8157 commit f69e626
Show file tree
Hide file tree
Showing 17 changed files with 529 additions and 6 deletions.
32 changes: 32 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/aws-util/Cargo.toml
Expand Up @@ -11,5 +11,6 @@ log = "0.4.0"
mz_rusoto_core = "0.46.0"
mz_rusoto_credential = "0.46.0"
mz_rusoto_kinesis = "0.46.0"
mz_rusoto_s3 = "0.46.0"
mz_rusoto_sts = "0.46.0"
tokio = "0.2.0"
1 change: 1 addition & 0 deletions src/aws-util/src/lib.rs
Expand Up @@ -13,3 +13,4 @@

pub mod aws;
pub mod kinesis;
pub mod s3;
64 changes: 64 additions & 0 deletions src/aws-util/src/s3.rs
@@ -0,0 +1,64 @@
// Copyright Materialize, Inc. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Utility functions for AWS S3 clients

use std::time::Duration;

use anyhow::Context;
use log::info;
use rusoto_core::{HttpClient, Region};
use rusoto_credential::{
AutoRefreshingProvider, ChainProvider, ProvideAwsCredentials, StaticProvider,
};
use rusoto_s3::S3Client;

/// Construct an S3Client
///
/// If statically provided connection information information is not provided,
/// falls back to using credentials gathered by rusoto's [`ChainProvider]
/// wrapped in an [`AutoRefreshingProvider`].
///
/// The [`AutoRefreshingProvider`] caches the underlying provider's AWS credentials,
/// automatically fetching updated credentials if they've expired.
pub async fn client(
region: Region,
access_key_id: Option<String>,
secret_access_key: Option<String>,
token: Option<String>,
) -> Result<S3Client, anyhow::Error> {
let request_dispatcher = HttpClient::new().context("creating HTTP client for S3 client")?;
let s3_client = match (access_key_id, secret_access_key) {
(Some(access_key_id), Some(secret_access_key)) => {
info!("Creating a new S3 client from provided access_key and secret_access_key");
S3Client::new_with(
request_dispatcher,
StaticProvider::new(access_key_id, secret_access_key, token, None),
region,
)
}
(None, None) => {
info!(
"AWS access_key_id and secret_access_key not provided, \
creating a new S3 client using a chain provider."
);
let mut provider = ChainProvider::new();
provider.set_timeout(Duration::from_secs(10));
provider.credentials().await?; // ensure that credentials exist
let provider =
AutoRefreshingProvider::new(provider).context("generating AWS credentials")?;

S3Client::new_with(request_dispatcher, provider, region)
}
(_, _) => anyhow::bail!(
"access_key_id and secret_access_key must either both be provided, or neither"
),
};
Ok(s3_client)
}
24 changes: 22 additions & 2 deletions src/coord/src/timestamp.rs
Expand Up @@ -38,8 +38,8 @@ use dataflow::source::read_file_task;
use dataflow::source::FileReadStyle;
use dataflow_types::{
AvroOcfEncoding, Consistency, DataEncoding, Envelope, ExternalSourceConnector,
FileSourceConnector, KafkaSourceConnector, KinesisSourceConnector, MzOffset, SourceConnector,
TimestampSourceUpdate,
FileSourceConnector, KafkaSourceConnector, KinesisSourceConnector, MzOffset, S3SourceConnector,
SourceConnector, TimestampSourceUpdate,
};
use expr::{PartitionId, SourceInstanceId};
use ore::collections::CollectionExt;
Expand Down Expand Up @@ -134,13 +134,15 @@ enum RtTimestampConnector {
File(RtFileConnector),
Ocf(RtFileConnector),
Kinesis(RtKinesisConnector),
S3(RtS3Connector),
}

enum ByoTimestampConnector {
Kafka(ByoKafkaConnector),
File(ByoFileConnector<Vec<u8>, anyhow::Error>),
Ocf(ByoFileConnector<Value, anyhow::Error>),
Kinesis(ByoKinesisConnector),
// S3 is not supported
}

// List of possible encoding types
Expand Down Expand Up @@ -283,6 +285,9 @@ struct ByoKinesisConnector {}
/// Data consumer stub for File source with RT consistency
struct RtFileConnector {}

/// Data consumer stub for S3 source with RT consistency
struct RtS3Connector {}

/// Data consumer stub for File source with BYO consistency
struct ByoFileConnector<Out, Err> {
stream: Receiver<Result<Out, Err>>,
Expand Down Expand Up @@ -1055,6 +1060,12 @@ impl Timestamper {
.map(|connector| RtTimestampConsumer {
connector: RtTimestampConnector::Kinesis(connector),
}),
ExternalSourceConnector::S3(s3c) => {
self.create_rt_s3_connector(id, s3c)
.map(|connector| RtTimestampConsumer {
connector: RtTimestampConnector::S3(connector),
})
}
}
}

Expand Down Expand Up @@ -1192,6 +1203,14 @@ impl Timestamper {
Some(RtFileConnector {})
}

fn create_rt_s3_connector(
&self,
_id: SourceInstanceId,
_fc: S3SourceConnector,
) -> Option<RtS3Connector> {
Some(RtS3Connector {})
}

fn create_byo_ocf_connector(
&self,
_id: SourceInstanceId,
Expand Down Expand Up @@ -1279,6 +1298,7 @@ impl Timestamper {
None => None,
}
}
ExternalSourceConnector::S3(_) => None, // BYO is not supported for s3 sources
}
}

Expand Down
1 change: 1 addition & 0 deletions src/dataflow-types/Cargo.toml
Expand Up @@ -10,6 +10,7 @@ anyhow = "1.0.37"
ccsr = { path = "../ccsr" }
comm = { path = "../comm" }
expr = { path = "../expr" }
globset = { version = "0.4.0", features = ["serde1"] }
interchange = { path = "../interchange" }
kafka-util = { path = "../kafka-util" }
log = "0.4.0"
Expand Down
21 changes: 21 additions & 0 deletions src/dataflow-types/src/types.rs
Expand Up @@ -19,6 +19,7 @@ use std::path::PathBuf;
use std::time::Duration;

use anyhow::Context;
use globset::Glob;
use log::warn;
use regex::Regex;
use rusoto_core::Region;
Expand Down Expand Up @@ -505,15 +506,27 @@ pub enum ExternalSourceConnector {
Kinesis(KinesisSourceConnector),
File(FileSourceConnector),
AvroOcf(FileSourceConnector),
S3(S3SourceConnector),
}

impl ExternalSourceConnector {
/// Returns the name and type of each additional metadata column that
/// Materialize will automatically append to the source's inherent columns.
///
/// Presently, each source type exposes precisely one metadata column that
/// corresponds to some source-specific record counter. For example, file
/// sources use a line number, while Kafka sources use a topic offset.
///
/// The columns declared here must be kept in sync with the actual source
/// implementations that produce these columns.
pub fn metadata_columns(&self) -> Vec<(ColumnName, ColumnType)> {
match self {
Self::Kafka(_) => vec![("mz_offset".into(), ScalarType::Int64.nullable(false))],
Self::File(_) => vec![("mz_line_no".into(), ScalarType::Int64.nullable(false))],
Self::Kinesis(_) => vec![("mz_offset".into(), ScalarType::Int64.nullable(false))],
Self::AvroOcf(_) => vec![("mz_obj_no".into(), ScalarType::Int64.nullable(false))],
// TODO: should we include object key and possibly object-internal offset here?
Self::S3(_) => vec![("mz_record".into(), ScalarType::Int64.nullable(false))],
}
}

Expand All @@ -524,6 +537,7 @@ impl ExternalSourceConnector {
ExternalSourceConnector::Kinesis(_) => "kinesis",
ExternalSourceConnector::File(_) => "file",
ExternalSourceConnector::AvroOcf(_) => "avro-ocf",
ExternalSourceConnector::S3(_) => "s3",
}
}

Expand Down Expand Up @@ -623,6 +637,13 @@ pub struct FileSourceConnector {
pub tail: bool,
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct S3SourceConnector {
pub bucket: String,
pub objects_pattern: Glob,
pub aws_info: AwsConnectInfo,
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct AwsConnectInfo {
pub region: Region,
Expand Down
2 changes: 2 additions & 0 deletions src/dataflow/Cargo.toml
Expand Up @@ -19,6 +19,7 @@ differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-
dogsdogsdogs = { git = "https://github.com/TimelyDataflow/differential-dataflow.git" }
expr = { path = "../expr" }
futures = "0.3.0"
globset = "0.4.0"
interchange = { path = "../interchange" }
itertools = "0.9.0"
kafka-util = { path = "../kafka-util" }
Expand All @@ -28,6 +29,7 @@ mz-avro = { path = "../avro", features = ["snappy"] }
mz_rusoto_core = "0.46.0"
mz_rusoto_credential = "0.46.0"
mz_rusoto_kinesis = "0.46.0"
mz_rusoto_s3 = "0.46.0"
notify = "4.0.0"
ore = { path = "../ore" }
pdqselect = "0.1.0"
Expand Down
8 changes: 7 additions & 1 deletion src/dataflow/src/render/mod.rs
Expand Up @@ -135,7 +135,7 @@ use crate::operator::{CollectionExt, StreamExt};
use crate::render::context::{ArrangementFlavor, Context};
use crate::server::{CacheMessage, LocalInput, TimestampDataUpdates, TimestampMetadataUpdates};
use crate::sink;
use crate::source::{self, FileSourceInfo, KafkaSourceInfo, KinesisSourceInfo};
use crate::source::{self, FileSourceInfo, KafkaSourceInfo, KinesisSourceInfo, S3SourceInfo};
use crate::source::{SourceConfig, SourceToken};
use crate::{
arrangement::manager::{TraceBundle, TraceManager},
Expand Down Expand Up @@ -414,6 +414,12 @@ where
connector,
)
}
ExternalSourceConnector::S3(_) => {
source::create_source::<_, S3SourceInfo, _>(
source_config,
connector,
)
}
ExternalSourceConnector::File(_) => {
source::create_source::<_, FileSourceInfo<Vec<u8>>, Vec<u8>>(
source_config,
Expand Down
2 changes: 2 additions & 0 deletions src/dataflow/src/source/mod.rs
Expand Up @@ -48,6 +48,7 @@ use crate::source::cache::CacheSender;
mod file;
mod kafka;
mod kinesis;
mod s3;
mod util;

pub mod cache;
Expand All @@ -58,6 +59,7 @@ pub use file::FileReadStyle;
pub use file::FileSourceInfo;
pub use kafka::KafkaSourceInfo;
pub use kinesis::KinesisSourceInfo;
pub use s3::S3SourceInfo;

/// Shared configuration information for all source types.
pub struct SourceConfig<'a, G> {
Expand Down

0 comments on commit f69e626

Please sign in to comment.