Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support InsertInto Sorted ListingTable #7743

Merged
merged 4 commits into from
Oct 8, 2023

Conversation

devinjdangelo
Copy link
Contributor

@devinjdangelo devinjdangelo commented Oct 5, 2023

Which issue does this PR close?

Closes #7354

Rationale for this change

See issue

What changes are included in this PR?

Allows specifying a required_input_ordering on a FileSinkExec which forces the output to be sorted in a particular way. The setting is optional, so Copy To can keep its existing behavior while ListingTable can optionally inject its required sort order.

Are these changes tested?

Yes, via a new sqllogic test

Are there any user-facing changes?

Inserting to a table with required order works and maintains required file ordering

@github-actions github-actions bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Oct 5, 2023
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really cool @devinjdangelo -- Thank you

I tried it out and and it works as advertised 🦾 . I had one request for a test, but I can add it as a follow on if you prefer.

I found a few other issues, but I don't think they are caused by this PR

$ mkdir /tmp/output
$ datafusion-cli
DataFusion CLI v31.0.0
❯ create external table output(time timestamp) stored as parquet location '/tmp/output' with order (time);
0 rows in set. Query took 0.003 seconds.

❯ insert into output values (now()), (now() - interval '1 minute'), (now() + interval '2 minutes'), (now() - interval '3 minutes');
+-------+
| count |
+-------+
| 4     |
+-------+
1 row in set. Query took 0.144 seconds.

❯ select * from output;
+----------------------------+
| time                       |
+----------------------------+
| 2023-10-06T20:31:08.042535 |
| 2023-10-06T20:33:08.042535 |
| 2023-10-06T20:34:08.042535 |
| 2023-10-06T20:36:08.042535 |
+----------------------------+
4 rows in set. Query took 0.005 seconds.

However, if I insert the same data again, now the data is not sorted!

❯ insert into output values (now()), (now() - interval '1 minute'), (now() + interval '2 minutes'), (now() - interval '3 minutes');
+-------+
| count |
+-------+
| 4     |
+-------+
1 row in set. Query took 0.178 seconds.

❯ select * from output;
+----------------------------+
| time                       |
+----------------------------+
| 2023-10-06T20:32:38.514236 |
| 2023-10-06T20:34:38.514236 |
| 2023-10-06T20:35:38.514236 |
| 2023-10-06T20:37:38.514236 |
| 2023-10-06T20:31:08.042535 |
| 2023-10-06T20:33:08.042535 |
| 2023-10-06T20:34:08.042535 |
| 2023-10-06T20:36:08.042535 |
+----------------------------+
8 rows in set. Query took 0.005 seconds.

I also found that there were a huge number of empty output files created

alamb@MacBook-Pro-8:~/Software/arrow-datafusion2/datafusion-cli$ ls -ltr /tmp/output
total 256
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_1.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_4.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_8.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_10.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_2.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_7.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_13.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_9.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_3.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_12.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_6.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_14.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_15.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_11.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_5.parquet
-rw-r--r--@ 1 alamb  wheel   615B Oct  6 16:34 1PHmXyyoDVGbi7oo_0.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_2.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_5.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_6.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_4.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_12.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_11.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_3.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_9.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_8.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_14.parquet
-rw-r--r--@ 1 alamb  wheel   615B Oct  6 16:35 FtsEcvDwXi7JVaVq_0.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_7.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_15.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_10.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_13.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_1.parquet

But I don't think this is caused by this PR - #5383

query II
INSERT INTO ordered_insert_test values (5, 1), (4, 2), (7,7), (7,8), (7,9), (7,10), (3, 3), (2, 4), (1, 5);
----
9
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please also add an EXPLAIN INSERT INTO ordered_insert_test values (5, 1), (4, 2) to verify that the plan has a `SortExec in it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just pushed an update with explain test.

@@ -73,6 +73,8 @@ pub struct FileSinkExec {
sink_schema: SchemaRef,
/// Schema describing the structure of the output data.
count_schema: SchemaRef,
/// Optional required sort order for output data.
sort_order: Option<Vec<Option<Vec<PhysicalSortRequirement>>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since FileSink can have only a single input, I think it only needs a single sort order per required_input_order

In other words, I think this could be simplified to

Suggested change
sort_order: Option<Vec<Option<Vec<PhysicalSortRequirement>>>>,
sort_order: Option<Vec<PhysicalSortRequirement>>>,

And then adjust required_input_order appropriately

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense. Will do!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just pushed an update with this change

@@ -604,7 +604,7 @@ impl DefaultPhysicalPlanner {
FileType::ARROW => Arc::new(ArrowFormat {}),
};

sink_format.create_writer_physical_plan(input_exec, session_state, config).await
sink_format.create_writer_physical_plan(input_exec, session_state, config, None).await
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb
Copy link
Contributor

alamb commented Oct 6, 2023

However, if I insert the same data again, now the data is not sorted!

I couldn't reproduce this locally with smaller reproducer and I am out of time. I'll investigate more if I have time

@devinjdangelo
Copy link
Contributor Author

devinjdangelo commented Oct 6, 2023

I found a few other issues, but I don't think they are caused by this PR

$ mkdir /tmp/output
$ datafusion-cli
DataFusion CLI v31.0.0
❯ create external table output(time timestamp) stored as parquet location '/tmp/output' with order (time);
0 rows in set. Query took 0.003 seconds.

❯ insert into output values (now()), (now() - interval '1 minute'), (now() + interval '2 minutes'), (now() - interval '3 minutes');
+-------+
| count |
+-------+
| 4     |
+-------+
1 row in set. Query took 0.144 seconds.

❯ select * from output;
+----------------------------+
| time                       |
+----------------------------+
| 2023-10-06T20:31:08.042535 |
| 2023-10-06T20:33:08.042535 |
| 2023-10-06T20:34:08.042535 |
| 2023-10-06T20:36:08.042535 |
+----------------------------+
4 rows in set. Query took 0.005 seconds.

However, if I insert the same data again, now the data is not sorted!

❯ insert into output values (now()), (now() - interval '1 minute'), (now() + interval '2 minutes'), (now() - interval '3 minutes');
+-------+
| count |
+-------+
| 4     |
+-------+
1 row in set. Query took 0.178 seconds.

❯ select * from output;
+----------------------------+
| time                       |
+----------------------------+
| 2023-10-06T20:32:38.514236 |
| 2023-10-06T20:34:38.514236 |
| 2023-10-06T20:35:38.514236 |
| 2023-10-06T20:37:38.514236 |
| 2023-10-06T20:31:08.042535 |
| 2023-10-06T20:33:08.042535 |
| 2023-10-06T20:34:08.042535 |
| 2023-10-06T20:36:08.042535 |
+----------------------------+
8 rows in set. Query took 0.005 seconds.

I actually think the above is correct behavior. The table is not globally sorted, but rather each individual file is sorted. Each time you insert, at least one new file is inserted. In the above result we see two independently sorted chunks, which means each insert created one new sorted file.

I also found that there were a huge number of empty output files created

alamb@MacBook-Pro-8:~/Software/arrow-datafusion2/datafusion-cli$ ls -ltr /tmp/output
total 256
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_1.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_4.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_8.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_10.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_2.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_7.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_13.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_9.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_3.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_12.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_6.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_14.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_15.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_11.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_5.parquet
-rw-r--r--@ 1 alamb  wheel   615B Oct  6 16:34 1PHmXyyoDVGbi7oo_0.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_2.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_5.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_6.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_4.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_12.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_11.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_3.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_9.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_8.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_14.parquet
-rw-r--r--@ 1 alamb  wheel   615B Oct  6 16:35 FtsEcvDwXi7JVaVq_0.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_7.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_15.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_10.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_13.parquet
-rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_1.parquet

But I don't think this is caused by this PR - #5383

Yeah, FileSinks currently output 1 file for each input stream. That number is determined by the plan/optimizer. Based on your result, I would guess that your system has 16 vcores, so you end up with 1 stream containing data and 15 empty streams.

We could solve this by making FileSinks a little more intelligent and dynamic in how they partition the output. For example, they could be configured with a target file size. This idea is related to what will be required for #7744 .

@alamb
Copy link
Contributor

alamb commented Oct 6, 2023

I actually think the above is correct behavior. The table is not globally sorted, but rather each individual file is sorted. Each time you insert, at least one new file is inserted. In the above result we see two independently sorted chunks, which means each insert created one new sorted file.

Yes, I think you are right. However, when I did an EXPLAIN plan I expect to see no Sorts (since each file is sorted, they can just be merged with SortPreservingMerge):

❯ explain select * from output order by time;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Sort: output.time ASC NULLS LAST                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|               |   TableScan: output projection=[time]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
| physical_plan | SortPreservingMergeExec: [time@0 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|               |   SortExec: expr=[time@0 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|               |     ParquetExec: file_groups={16 groups: [[private/tmp/output/FtsEcvDwXi7JVaVq_6.parquet, private/tmp/output/FtsEcvDwXi7JVaVq_12.parquet], [private/tmp/output/1PHmXyyoDVGbi7oo_5.parquet, private/tmp/output/1PHmXyyoDVGbi7oo_12.parquet], [private/tmp/output/1PHmXyyoDVGbi7oo_4.parquet, private/tmp/output/1PHmXyyoDVGbi7oo_13.parquet], [private/tmp/output/FtsEcvDwXi7JVaVq_13.parquet, private/tmp/output/FtsEcvDwXi7JVaVq_7.parquet], [private/tmp/output/1PHmXyyoDVGbi7oo_11.parquet, private/tmp/output/1PHmXyyoDVGbi7oo_6.parquet], ...]}, projection=[time] |
|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 rows in set. Query took 0.003 seconds.

But now that I look at that plan, perhaps the issue is that there is more than one file in each group, so the sort order can't be maintained 🤔

@devinjdangelo
Copy link
Contributor Author

But now that I look at that plan, perhaps the issue is that there is more than one file in each group, so the sort order can't be maintained 🤔

Perhaps if each output file were larger we would avoid that issue. It would be nice if you could configure a desired file output size rather than relying on the plan partitioning... I opened #7767 for this.

@alamb
Copy link
Contributor

alamb commented Oct 8, 2023

But now that I look at that plan, perhaps the issue is that there is more than one file in each group, so the sort order can't be maintained 🤔

Perhaps if each output file were larger we would avoid that issue. It would be nice if you could configure a desired file output size rather than relying on the plan partitioning... I opened #7767 for this.

I think the issue is explained here: https://github.com/apache/arrow-datafusion/blob/3d1b23a04bdc04c526e2dcb06e0cf1995707587d/datafusion/core/src/datasource/physical_plan/mod.rs#L408-L467

(which is not all that easy to find)

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love it - thank you @devinjdangelo

@@ -907,17 +907,19 @@ impl TableProvider for ListingTable {
"Cannot insert into a sorted ListingTable with mode append!".into(),
));
}
// Multiple sort orders in outer vec are equivalent, so we pass only the first one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb alamb merged commit 258a7cd into apache:main Oct 8, 2023
22 checks passed
@andygrove andygrove added the enhancement New feature or request label Nov 5, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate enhancement New feature or request sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Allow inserts to a sorted ListingTable
3 participants