Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the COPY from external location performance #4308

Closed
Tracked by #3586
BohuTANG opened this issue Mar 3, 2022 · 19 comments · Fixed by #4783
Closed
Tracked by #3586

Improve the COPY from external location performance #4308

BohuTANG opened this issue Mar 3, 2022 · 19 comments · Fixed by #4783
Assignees
Labels
A-query Area: databend query community-take good first issue Category: good first issue
Milestone

Comments

@BohuTANG
Copy link
Member

BohuTANG commented Mar 3, 2022

Summary
If we COPY a s3 file and insert into a table, the progresses are:
S1. Read s3 file by blocks from s3 location
S2. Write blocks stream to table t1
S3. Commit

let executor = PipelinePullingExecutor::try_create(pipeline)?;
let source_stream = Box::pin(ProcessorExecutorStream::create(executor)?);
let progress_stream = Box::pin(ProgressStream::try_create(
source_stream,
ctx.get_scan_progress(),
)?);
let table = ctx
.get_table(&self.plan.db_name, &self.plan.tbl_name)
.await?;
let operations = table
.append_data(ctx.clone(), progress_stream)
.await?
.try_collect()
.await?;
// Commit.
table
.commit_insertion(ctx.clone(), operations, false)
.await?;

S1 and S2 is in the same thread, looks we can make them in parallel.

@BohuTANG BohuTANG added A-query Area: databend query good first issue Category: good first issue labels Mar 3, 2022
@GrapeBaBa
Copy link
Contributor

/assignme

@GrapeBaBa
Copy link
Contributor

@BohuTANG Since I am not quite familiar with rust threads, have questions about this issue. I feel the source stream items will produced by pipeline executor, it seems already in other threads, right? The consumer is in current thread?

@sundy-li
Copy link
Member

sundy-li commented Mar 10, 2022

I think if we transform the Copy Plan into insert select Plan, then the pipeline engine will make it on parallel.
We can do it after #4345

@sundy-li
Copy link
Member

@GrapeBaBa

It's ready to do it now.

We can build pipeline through new processor, example

Feel free to contact me if you need any help.

@GrapeBaBa
Copy link
Contributor

@sundy-li Got it.

@GrapeBaBa
Copy link
Contributor

@sundy-li Should we use a sink pipe and a source pipe to create a complete pipeline for refactoring?

@sundy-li
Copy link
Member

sundy-li commented Apr 3, 2022

Should we use a sink pipe and a source pipe to create a complete pipeline for refactoring?

Yes!

@GrapeBaBa
Copy link
Contributor

@BohuTANG @sundy-li Do we have an approach to test copy into feature local?

@BohuTANG
Copy link
Member Author

BohuTANG commented Apr 3, 2022

Run databend-query with disk:

  1. create a named internal stage
  2. Streaming a file into the stage via the http streaming_load api
  3. Do COPY from internal stage

@GrapeBaBa
Copy link
Contributor

Why is there

Run databend-query with disk:

  1. create a named internal stage
  2. Streaming a file into the stage via the http streaming_load api
  3. Do COPY from internal stage

OK. I also find some tests in the codes which seems using minio without detail steps. And why is interpreter_copy unit test is missing? Is it difficult to mock or something else?

@sundy-li
Copy link
Member

sundy-li commented Apr 3, 2022

interpreter_copy unit test
Yes, it's hard to make a unit stateful test. So tests must run using minio.

@GrapeBaBa
Copy link
Contributor

Upload file to stage using

curl -H "stage_name:my_internal_stage" -F "upload=@./books.csv" -XPUT http://localhost:8081/v1/upload_to_stage

Got this error

unexpected: (op: write, path: /Users/kaichen/Documents/projects/databend/target/debug/benddata/datas/stage/my_internal_stage, source: File exists (os error 17))%   

@GrapeBaBa
Copy link
Contributor

The log shows

^[[2m2022-04-03T12:47:38.884447Z^[[0m ^[[31mERROR^[[0m ^[[2mopendal::services::fs::backend^[[0m^[[2m:^[[0m object /Users/kaichen/Documents/projects/databend/target/debug/benddata/datas/stage/my_internal_stage/books.csv create_dir_all for parent /Users/kaichen/Documents/projects/databend/target/debug/benddata/datas/stage/my_internal_stage: Custom { kind: AlreadyExists, error: ObjectError { op: "write", path: "/Users/kaichen/Documents/projects/databend/target/debug/benddata/datas/stage/my_internal_stage", source: File exists (os error 17) } }
^[[2m2022-04-03T12:47:38.884878Z^[[0m ^[[33m WARN^[[0m ^[[2mdatabend_query::servers::http::middleware^[[0m^[[2m:^[[0m http request error: status=500 Internal Server Error, msg=op: write, path: /Users/kaichen/Documents/projects/databend/target/debug/benddata/datas/stage/my_internal_stage, source: File exists (os error 17)

@GrapeBaBa
Copy link
Contributor

GrapeBaBa commented Apr 3, 2022

After

create stage xxx;

there will be a xxx file generated in local dir, is it expected? @BohuTANG @sundy-li

@GrapeBaBa
Copy link
Contributor

-rw-r--r--  1 kaichen  staff    0 Apr  3 19:47 my_internal_stage
-rw-r--r--  1 kaichen  staff    0 Apr  3 20:12 test
-rw-r--r--  1 kaichen  staff    0 Apr  3 20:26 test1
-rw-r--r--  1 kaichen  staff    0 Apr  3 20:54 test2

@BohuTANG
Copy link
Member Author

BohuTANG commented Apr 4, 2022

xxx should be a folder under the stage directory not a file, this should be a bug:

if user_stage.stage_type == StageType::Internal {
let prefix = format!("stage/{}/", user_stage.stage_name);
let op = self.ctx.get_storage_operator()?;
let obj = op.object(&prefix);
// Write file then delete file ensure the directory is created
// TODO(xuanwo), opendal support mkdir (https://github.com/datafuselabs/opendal/issues/151)
// This is not compatible in fs
let meta = obj.metadata().await;
if meta.is_err() {
let file_obj = op.object(&prefix);
let _ = file_obj.write("").await?;
}

@GrapeBaBa
Copy link
Contributor

example

Yes, it should be a bug since I already tested successfully using minio. Let me try to fix it.

@GrapeBaBa
Copy link
Contributor

#4783 (comment) @zhang2014 @sundy-li @BohuTANG let me back here, I looked at the MemoryTable code and know we can use one pipe to implement it. However about if using S3StageTable, I am not quite sure. If I understand correct, this copy operation should do internal stage as well which is not using S3 storage. What is your suggestion?

@GrapeBaBa
Copy link
Contributor

I finally got actually S3StageTable may handle both internal and external stage source. Maybe just name it StageTable more make sense.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-query Area: databend query community-take good first issue Category: good first issue
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants