Skip to content

Commit

Permalink
Merge pull request #20 from seddonm1/main
Browse files Browse the repository at this point in the history
remove the explicit bucket and derive from url like normal s3
  • Loading branch information
matthewmturner committed Jan 23, 2022
2 parents b661537 + 681d681 commit 366bb6c
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 34 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ datafusion = "6.0.0"
futures = "0.3.19"
http = "0.2.6"
num_cpus = "1.13.1"
tokio = {version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs"]}
tokio = {version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs"]}
7 changes: 2 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Enable S3 as an ObjectStore for Datafusion

## Querying files on S3 with DataFusion

This crate can be used for interacting with both AWS S3 and implementers of the S3 standard. Examples for querying AWS and other implementors, such as Minio, are shown below.
This crate can be used for interacting with both AWS S3 and implementers of the S3 standard. Examples for querying AWS and other implementors, such as MinIO, are shown below.

```rust
// Load credentials from default AWS credential provider (such as environment or ~/.aws/credentials)
Expand All @@ -16,7 +16,6 @@ let amazon_s3_file_system = Arc::new(
None,
None,
None,
BUCKET,
)
.await,
);
Expand All @@ -27,7 +26,6 @@ const ACCESS_KEY_ID: &str = "AKIAIOSFODNN7EXAMPLE";
const SECRET_ACCESS_KEY: &str = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY";
const PROVIDER_NAME: &str = "Static";
const MINIO_ENDPOINT: &str = "http://localhost:9000";
const BUCKET: &str = "data";

// Load credentials from default AWS credential provider (such as environment or ~/.aws/credentials)
let amazon_s3_file_system = AmazonS3FileSystem::new(
Expand All @@ -43,13 +41,12 @@ let amazon_s3_file_system = AmazonS3FileSystem::new(
None,
None,
None,
BUCKET,
)
.await;
```

```rust
let filename = "alltypes_plain.snappy.parquet";
let filename = "data/alltypes_plain.snappy.parquet";

let listing_options = ListingOptions {
format: Arc::new(ParquetFormat::default()),
Expand Down
100 changes: 72 additions & 28 deletions src/object_store/aws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ use aws_types::credentials::SharedCredentialsProvider;
use bytes::Buf;

/// new_client creates a new aws_sdk_s3::Client
/// at time of writing the aws_config::load_from_env() does not allow configuring the endpoint which is
/// required for continuous integration testing and uses outside the AWS ecosystem
/// this uses aws_config::load_from_env() as a base config then allows users to override specific settings if required
///
/// an example use case for overriding is to specify an endpoint which is not Amazon S3 such as MinIO or Ceph.
async fn new_client(
credentials_provider: Option<SharedCredentialsProvider>,
region: Option<Region>,
Expand Down Expand Up @@ -91,7 +92,6 @@ pub struct AmazonS3FileSystem {
retry_config: Option<RetryConfig>,
sleep: Option<Arc<dyn AsyncSleep>>,
timeout_config: Option<TimeoutConfig>,
bucket: String,
client: Client,
}

Expand All @@ -103,7 +103,6 @@ impl AmazonS3FileSystem {
retry_config: Option<RetryConfig>,
sleep: Option<Arc<dyn AsyncSleep>>,
timeout_config: Option<TimeoutConfig>,
bucket: &str,
) -> Self {
Self {
credentials_provider: credentials_provider.clone(),
Expand All @@ -112,7 +111,6 @@ impl AmazonS3FileSystem {
retry_config: retry_config.clone(),
sleep: sleep.clone(),
timeout_config: timeout_config.clone(),
bucket: bucket.to_string(),
client: new_client(credentials_provider, region, endpoint, None, None, None).await,
}
}
Expand All @@ -121,10 +119,15 @@ impl AmazonS3FileSystem {
#[async_trait]
impl ObjectStore for AmazonS3FileSystem {
async fn list_file(&self, prefix: &str) -> Result<FileMetaStream> {
let (bucket, prefix) = match prefix.split_once("/") {
Some((bucket, prefix)) => (bucket.to_owned(), prefix),
None => (prefix.to_owned(), ""),
};

let objects = self
.client
.list_objects_v2()
.bucket(&self.bucket)
.bucket(&bucket)
.prefix(prefix)
.send()
.await
Expand All @@ -133,10 +136,10 @@ impl ObjectStore for AmazonS3FileSystem {
.unwrap_or_default()
.to_vec();

let result = stream::iter(objects.into_iter().map(|object| {
let result = stream::iter(objects.into_iter().map(move |object| {
Ok(FileMeta {
sized_file: SizedFile {
path: object.key().unwrap_or("").to_string(),
path: format!("{}/{}", &bucket, object.key().unwrap_or("")),
size: object.size() as u64,
},
last_modified: object
Expand All @@ -160,7 +163,6 @@ impl ObjectStore for AmazonS3FileSystem {
self.retry_config.clone(),
self.sleep.clone(),
self.timeout_config.clone(),
&self.bucket,
file,
)?))
}
Expand All @@ -173,7 +175,6 @@ struct AmazonS3FileReader {
retry_config: Option<RetryConfig>,
sleep: Option<Arc<dyn AsyncSleep>>,
timeout_config: Option<TimeoutConfig>,
bucket: String,
file: SizedFile,
}

Expand All @@ -186,7 +187,6 @@ impl AmazonS3FileReader {
retry_config: Option<RetryConfig>,
sleep: Option<Arc<dyn AsyncSleep>>,
timeout_config: Option<TimeoutConfig>,
bucket: &str,
file: SizedFile,
) -> Result<Self> {
Ok(Self {
Expand All @@ -196,7 +196,6 @@ impl AmazonS3FileReader {
retry_config,
sleep,
timeout_config,
bucket: bucket.to_string(),
file,
})
}
Expand All @@ -215,8 +214,7 @@ impl ObjectReader for AmazonS3FileReader {
let retry_config = self.retry_config.clone();
let sleep = self.sleep.clone();
let timeout_config = self.timeout_config.clone();
let bucket = self.bucket.clone();
let key = self.file.path.clone();
let file_path = self.file.path.clone();

// once the async chunk file readers have been implemented this complexity can be removed
let (tx, rx) = mpsc::channel();
Expand All @@ -238,6 +236,11 @@ impl ObjectReader for AmazonS3FileReader {
)
.await;

let (bucket, key) = match file_path.split_once("/") {
Some((bucket, prefix)) => (bucket, prefix),
None => (file_path.as_str(), ""),
};

let get_object = client.get_object().bucket(bucket).key(key);
let resp = if length > 0 {
// range bytes requests are inclusive
Expand Down Expand Up @@ -292,7 +295,6 @@ mod tests {
const SECRET_ACCESS_KEY: &str = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY";
const PROVIDER_NAME: &str = "Static";
const MINIO_ENDPOINT: &str = "http://localhost:9000";
const BUCKET: &str = "data";

#[tokio::test]
async fn test_read_files() -> Result<()> {
Expand All @@ -309,11 +311,10 @@ mod tests {
None,
None,
None,
BUCKET,
)
.await;

let mut files = amazon_s3_file_system.list_file("").await?;
let mut files = amazon_s3_file_system.list_file("data").await?;

while let Some(file) = files.next().await {
let sized_file = file.unwrap().sized_file;
Expand Down Expand Up @@ -356,11 +357,10 @@ mod tests {
None,
None,
None,
BUCKET,
)
.await;
let mut files = amazon_s3_file_system
.list_file("alltypes_plain.snappy.parquet")
.list_file("data/alltypes_plain.snappy.parquet")
.await?;

if let Some(file) = files.next().await {
Expand Down Expand Up @@ -397,12 +397,11 @@ mod tests {
None,
None,
None,
BUCKET,
)
.await,
);

let filename = "alltypes_plain.snappy.parquet";
let filename = "data/alltypes_plain.snappy.parquet";

let listing_options = ListingOptions {
format: Arc::new(ParquetFormat::default()),
Expand Down Expand Up @@ -445,12 +444,11 @@ mod tests {
None,
None,
None,
BUCKET,
)
.await,
);

let filename = "alltypes_plain.snappy.parquet";
let filename = "data/alltypes_plain.snappy.parquet";

let listing_options = ListingOptions {
format: Arc::new(ParquetFormat::default()),
Expand All @@ -477,15 +475,61 @@ mod tests {

let batches = ctx.sql("SELECT * FROM tbl").await?.collect().await?;
let expected = vec![
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
"| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col | string_col | timestamp_col |",
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
"| 6 | true | 0 | 0 | 0 | 0 | 0 | 0 | 30342f30312f3039 | 30 | 2009-04-01 00:00:00 |",
"| 7 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30342f30312f3039 | 31 | 2009-04-01 00:01:00 |",
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
"| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col | string_col | timestamp_col |",
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
"| 6 | true | 0 | 0 | 0 | 0 | 0 | 0 | 30342f30312f3039 | 30 | 2009-04-01 00:00:00 |",
"| 7 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30342f30312f3039 | 31 | 2009-04-01 00:01:00 |",
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+"
];
assert_batches_eq!(&expected, &batches);

Ok(())
}

#[tokio::test]
#[should_panic(expected = "Could not parse metadata: bad data")]
async fn test_read_alternative_bucket() {
let amazon_s3_file_system = Arc::new(
AmazonS3FileSystem::new(
Some(SharedCredentialsProvider::new(Credentials::new(
ACCESS_KEY_ID,
SECRET_ACCESS_KEY,
None,
None,
PROVIDER_NAME,
))),
None,
Some(Endpoint::immutable(Uri::from_static(MINIO_ENDPOINT))),
None,
None,
None,
)
.await,
);

let filename = "bad_data/PARQUET-1481.parquet";

let listing_options = ListingOptions {
format: Arc::new(ParquetFormat::default()),
collect_stat: true,
file_extension: "parquet".to_owned(),
target_partitions: num_cpus::get(),
table_partition_cols: vec![],
};

let resolved_schema = listing_options
.infer_schema(amazon_s3_file_system.clone(), filename)
.await
.unwrap();

let table = ListingTable::new(
amazon_s3_file_system,
filename.to_owned(),
resolved_schema,
listing_options,
);

table.scan(&None, 1024, &[], None).await.unwrap();
}
}

0 comments on commit 366bb6c

Please sign in to comment.