Skip to content

Commit

Permalink
Merge pull request #4328 from BohuTANG/dev-copy-list-files-3586
Browse files Browse the repository at this point in the history
Add files list for the COPY command
  • Loading branch information
BohuTANG committed Mar 7, 2022
2 parents deb5698 + 54bd46e commit ca55802
Show file tree
Hide file tree
Showing 23 changed files with 288 additions and 102 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions common/exception/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ common-arrow = { path = "../arrow" }
anyhow = "1.0.55"
backtrace = "0.3.64"
octocrab = "0.15.4"
opendal = "0.1.4"
paste = "1.0.6"
prost = "0.9.0"
serde = { version = "1.0.136", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ build_exceptions! {
DalTransportError(3003),
DalPathNotFound(3004),
SerdeError(3005),
DalS3Error(3006),
DalError(3006),
DalStatError(3007),
}

Expand Down
7 changes: 7 additions & 0 deletions common/exception/src/exception_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,10 @@ impl From<ErrorCode> for tonic::Status {
}
}
}

// OpenDAL error.
impl From<opendal::error::Error> for ErrorCode {
fn from(error: opendal::error::Error) -> Self {
ErrorCode::DalError(format!("{:?}", error))
}
}
2 changes: 2 additions & 0 deletions common/io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ bincode = { git = "https://github.com/datafuse-extras/bincode", rev = "fd3f9ff"

# Crates.io dependencies
bytes = "1.1.0"
futures = "0.3.21"
opendal = "0.1.4"
serde = { version = "1.0.136", features = ["derive"] }

[dev-dependencies]
Expand Down
105 changes: 105 additions & 0 deletions common/io/src/files/file_s3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::path::Path;

use common_exception::ErrorCode;
use common_exception::Result;
use futures::StreamExt;
use opendal::credential::Credential;
use opendal::ObjectMode;
use opendal::Operator;
use opendal::Reader;

pub struct S3File {}

impl S3File {
// Open a s3 operator.
pub async fn open(
s3_endpoint: &str,
s3_bucket: &str,
aws_key_id: &str,
aws_secret_key: &str,
) -> Result<Operator> {
let mut builder = opendal::services::s3::Backend::build();

// Endpoint url.
builder.endpoint(s3_endpoint);

// Bucket.
builder.bucket(s3_bucket);

// Credentials.
if !aws_key_id.is_empty() {
let credential = Credential::hmac(aws_key_id, aws_secret_key);
builder.credential(credential);
}

let accessor = builder
.finish()
.await
.map_err(|e| ErrorCode::DalError(format!("s3 dal build error:{:?}", e)))?;
Ok(opendal::Operator::new(accessor))
}

// Read a file, returns the reader.
// file_name is the Some(/path/to/path/xx.csv)
pub async fn read(
file_name: Option<String>,
s3_endpoint: &str,
s3_bucket: &str,
aws_key_id: &str,
aws_secret_key: &str,
) -> Result<Reader> {
let operator = Self::open(s3_endpoint, s3_bucket, aws_key_id, aws_secret_key).await?;
let path = file_name.unwrap_or_else(|| "".to_string());
Ok(operator.object(&path).reader())
}

// Get the files in the path.
pub async fn list(
s3_endpoint: &str,
s3_bucket: &str,
path: &str,
aws_key_id: &str,
aws_secret_key: &str,
) -> Result<Vec<String>> {
let mut list: Vec<String> = vec![];
let operator = Self::open(s3_endpoint, s3_bucket, aws_key_id, aws_secret_key).await?;

// Check the path object mode is DIR or FILE.
let mode = operator.object(path).metadata().await?.mode();
match mode {
ObjectMode::FILE => {
list.push(path.to_string());
}
ObjectMode::DIR => {
let mut objects = operator.objects(path);
while let Some(object) = objects.next().await {
let meta = object?.metadata().await?;
let new_path = Path::new(path).join(meta.path());
list.push(new_path.to_string_lossy().to_string());
}
}
other => {
return Err(ErrorCode::DalError(format!(
"S3 list() can not handle the object mode: {:?}",
other
)))
}
}

Ok(list)
}
}
17 changes: 17 additions & 0 deletions common/io/src/files/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod file_s3;

pub use file_s3::S3File;
1 change: 1 addition & 0 deletions common/io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod prelude;
mod binary_read;
mod binary_write;
mod buf_read;
mod files;
mod marshal;
mod options_deserializer;
mod stat_buffer;
Expand Down
1 change: 1 addition & 0 deletions common/io/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub use crate::binary_write::put_uvarint;
pub use crate::binary_write::BinaryWrite;
pub use crate::binary_write::BinaryWriteBuf;
pub use crate::buf_read::BufReadExt;
pub use crate::files::S3File;
pub use crate::marshal::Marshal;
pub use crate::options_deserializer::OptionsDeserializer;
pub use crate::options_deserializer::OptionsDeserializerError;
Expand Down
4 changes: 4 additions & 0 deletions common/planners/src/plan_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub struct CopyPlan {
pub from: ReadDataSourcePlan,
pub validation_mode: ValidationMode,
pub files: Vec<String>,
pub pattern: String,
}

impl CopyPlan {
Expand All @@ -77,6 +78,9 @@ impl Debug for CopyPlan {
if !self.files.is_empty() {
write!(f, " ,files:{:?}", self.files)?;
}
if !self.pattern.is_empty() {
write!(f, " ,pattern:{:?}", self.pattern)?;
}
write!(f, " ,validation_mode:{:?}", self.validation_mode)
}
}
78 changes: 71 additions & 7 deletions query/src/interpreters/interpreter_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::path::Path;
use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use common_io::prelude::S3File;
use common_meta_types::StageStorage;
use common_planners::CopyPlan;
use common_planners::PlanNode;
use common_planners::ReadDataSourcePlan;
Expand All @@ -24,6 +28,7 @@ use common_streams::ProgressStream;
use common_streams::SendableDataBlockStream;
use common_tracing::tracing;
use futures::TryStreamExt;
use regex::Regex;

use crate::interpreters::stream::ProcessorExecutorStream;
use crate::interpreters::Interpreter;
Expand All @@ -42,6 +47,49 @@ impl CopyInterpreter {
Ok(Arc::new(CopyInterpreter { ctx, plan }))
}

// List the files.
// There are two cases here:
// 1. If the plan.files is not empty, we already set the files sets to the COPY command with: `files=(<file1>, <file2>)` syntax, only need to add the prefix to the file.
// 2. If the plan.files is empty, there are also two case:
// 2.1 If the path is a file like /path/to/path/file, S3File::list() will return the same file path.
// 2.2 If the path is a folder, S3File::list() will return all the files in it.
async fn list_files(&self) -> Result<Vec<String>> {
let files = match &self.plan.from.source_info {
SourceInfo::S3ExternalSource(table_info) => {
let storage = &table_info.stage_info.stage_params.storage;
match &storage {
StageStorage::S3(s3) => {
let path = &s3.path;

// Here we add the path to the file: /path/to/path/file1.
if !self.plan.files.is_empty() {
let mut files_with_path = vec![];
for file in &self.plan.files {
let new_path = Path::new(path).join(file);
files_with_path.push(new_path.to_string_lossy().to_string());
}
Ok(files_with_path)
} else {
let endpoint = &self.ctx.get_config().storage.s3.endpoint_url;
let bucket = &s3.bucket;

let key_id = &s3.credentials_aws_key_id;
let secret_key = &s3.credentials_aws_secret_key;

S3File::list(endpoint, bucket, path, key_id, secret_key).await
}
}
}
}
other => Err(ErrorCode::LogicalError(format!(
"Cannot list files for the source info: {:?}",
other
))),
};

files
}

// Rewrite the ReadDataSourcePlan.S3ExternalSource.file_name to new file name.
fn rewrite_read_plan_file_name(
mut plan: ReadDataSourcePlan,
Expand Down Expand Up @@ -116,14 +164,30 @@ impl Interpreter for CopyInterpreter {
&self,
mut _input_stream: Option<SendableDataBlockStream>,
) -> Result<SendableDataBlockStream> {
let files = self.plan.files.clone();
let mut files = self.list_files().await?;

// Pattern match check.
let pattern = &self.plan.pattern;
if !pattern.is_empty() {
let regex = Regex::new(pattern).map_err(|e| {
ErrorCode::SyntaxException(format!(
"Pattern format invalid, got:{}, error:{:?}",
pattern, e
))
})?;

let matched_files = files
.iter()
.filter(|file| regex.is_match(file))
.cloned()
.collect();
files = matched_files;
}

if files.is_empty() {
self.copy_one_file_to_table(None).await?;
} else {
for file in files {
self.copy_one_file_to_table(Some(file)).await?;
}
tracing::info!("copy file list:{:?}, pattern:{}", &files, pattern,);

for file in files {
self.copy_one_file_to_table(Some(file)).await?;
}

Ok(Box::pin(DataBlockStream::create(
Expand Down
13 changes: 10 additions & 3 deletions query/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use common_planners::Part;
use common_planners::Partitions;
use common_planners::PlanNode;
use common_planners::ReadDataSourcePlan;
use common_planners::S3ExternalTableInfo;
use common_planners::SourceInfo;
use common_planners::Statistics;
use common_streams::AbortStream;
Expand Down Expand Up @@ -97,7 +98,9 @@ impl QueryContext {
SourceInfo::TableSource(table_info) => {
self.build_table_by_table_info(table_info, plan.tbl_args.clone())
}
SourceInfo::S3ExternalSource(_s3_table_info) => self.build_s3_external_table(),
SourceInfo::S3ExternalSource(s3_table_info) => {
self.build_s3_external_by_table_info(s3_table_info, plan.tbl_args.clone())
}
}
}

Expand All @@ -121,8 +124,12 @@ impl QueryContext {
// Build s3 external table by stage info, this is used in:
// COPY INTO t1 FROM 's3://'
// 's3://' here is a s3 external stage, and build it to the external table.
fn build_s3_external_table(&self) -> Result<Arc<dyn Table>> {
S3ExternalTable::try_create()
fn build_s3_external_by_table_info(
&self,
table_info: &S3ExternalTableInfo,
_table_args: Option<Vec<Expression>>,
) -> Result<Arc<dyn Table>> {
S3ExternalTable::try_create(table_info.clone())
}

pub fn get_scan_progress(&self) -> Arc<Progress> {
Expand Down
8 changes: 8 additions & 0 deletions query/src/sql/parsers/parser_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ impl<'a> DfParser<'a> {
self.expect_token(")")?;
}

// PATTERN = '<regex_pattern>'
let mut pattern = "".to_string();
if self.consume_token("PATTERN") {
self.expect_token("=")?;
pattern = self.parse_value_or_ident()?;
}

// file_format = (type = csv field_delimiter = '|' skip_header = 1)
let mut file_format_options = HashMap::default();
if self.consume_token("FILE_FORMAT") {
Expand Down Expand Up @@ -107,6 +114,7 @@ impl<'a> DfParser<'a> {
encryption_options,
file_format_options,
files,
pattern,
on_error,
size_limit,
validation_mode,
Expand Down

0 comments on commit ca55802

Please sign in to comment.