Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-15260: [R] open_dataset - add file_name as column #12826

Merged
merged 27 commits into from
Aug 10, 2022

Conversation

thisisnic
Copy link
Member

No description provided.

@github-actions
Copy link

github-actions bot commented Apr 7, 2022

@github-actions
Copy link

github-actions bot commented Apr 7, 2022

⚠️ Ticket has not been started in JIRA, please click 'Start Progress'.

@thisisnic
Copy link
Member Author

thisisnic commented Apr 7, 2022

Currently this fails with this error:

Error in `handle_csv_read_error()` at r/R/dplyr-collect.R:33:6:
! Invalid: No match for FieldRef.Name(__filename) in int: int32
dbl: double
lgl: bool
chr: string
fct: dictionary<values=string, indices=int32, ordered=0>
ts: timestamp[us, tz=UTC]
group: int32
other: string

I think it's something to do with the fact that the new column is not in the schema; if I try to print the arrow_dplyr_query object before I collect, I get:

Error in schm$GetFieldByName(name)$type$ToString() : 
  attempt to apply non-function

Which appears to come from here:

schm$GetFieldByName(name)$type$ToString()

I can successfully run:

ds %>%
  mutate(x = Expression$field_ref("chr")) %>%
  collect()

and when I run

ds %>%
  mutate(x = Expression$field_ref("made_up_name")) %>%
  collect()

I get the same error (No match for FieldRef.Name) , which makes me think that we need to do something higher up somewhere as we're not even picking up the augmented field.

@nealrichardson
Copy link
Member

Currently this fails with this error:

If you haven't already, can you build arrow with -DARROW_EXTRA_ERROR_CONTEXT=ON and include the C++ traceback from the error? From the error message it sounds like it's coming from C++ not R.

Re: the printing error, we'll have to handle that somehow. I wonder how many other places we assume that the schema contains all possible valid field refs; I also think that the C++ Dataset layer should have a way of handling this better than having us special case and sniff for these "augmented columns".

(Side note: Error in 'handle_csv_read_error()' is misleading and should be solvable; we're catching errors and inspecting for a CSV read error message and we rethrow the error if it's not that, but then shows the error coming from the error handler and not the original function. Surely there is a way to handle this, in rlang or otherwise.)

@thisisnic
Copy link
Member Author

thisisnic commented Apr 8, 2022

@nealrichardson I'd just trimmed it as I didn't realise it was relevant; here's the rest of it:

/home/nic2/arrow/cpp/src/arrow/type.h:1717  CheckNonEmpty(matches, root)
/home/nic2/arrow/cpp/src/arrow/compute/exec/expression.cc:397  ref->FindOne(in)
/home/nic2/arrow/cpp/src/arrow/compute/exec/expression.cc:410  BindImpl(std::move(argument), in, shape, exec_context)

@thisisnic
Copy link
Member Author

@nealrichardson Have submitted a PR which handles the issue described in your "side note" there in #12839

@nealrichardson
Copy link
Member

@nealrichardson I'd just trimmed it as I didn't realise it was relevant; here's the rest of it:

/home/nic2/arrow/cpp/src/arrow/type.h:1717  CheckNonEmpty(matches, root)
/home/nic2/arrow/cpp/src/arrow/compute/exec/expression.cc:397  ref->FindOne(in)
/home/nic2/arrow/cpp/src/arrow/compute/exec/expression.cc:410  BindImpl(std::move(argument), in, shape, exec_context)

Great. So that points to where in C++ the validation needs to change. Can you see if this is the same issue as Weston identified, or if it needs to be a separate one? (IIRC the issue he created was about Scanner and this is not using Scanner, but maybe it boils down to the same thing.)

thisisnic added a commit that referenced this pull request Apr 13, 2022
…and `handle_parquet_io_error()` need better error tracing

As discussed on #12826

Not sure how (if) to write tests but tried running it locally using the CSV directory set up in `test-dataset-csv.R` with and without this change, and without it, we get, e.g.

```
open_dataset(csv_dir)
# Error in `handle_parquet_io_error()` at r/R/dataset.R:221:6:
# ! Invalid: Error creating dataset. Could not read schema from '/tmp/RtmpuTyOD8/file5049dcf581a5/5/file1.csv': Could not open Parquet input source '/tmp/RtmpuTyOD8/file5049dcf581a5/5/file1.csv': Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.
# /home/nic2/arrow/cpp/src/arrow/dataset/file_parquet.cc:323  GetReader(source, scan_options). Is this a 'parquet' file?
# /home/nic2/arrow/cpp/src/arrow/dataset/discovery.cc:40  InspectSchemas(std::move(options))
# /home/nic2/arrow/cpp/src/arrow/dataset/discovery.cc:262  Inspect(options.inspect_options)
# ℹ Did you mean to specify a 'format' other than the default (parquet)?
```

and then with it:

```
open_dataset(csv_dir)
# Error in `open_dataset()`:
# ! Invalid: Error creating dataset. Could not read schema from '/tmp/RtmpLbqZs6/file4e4ca14fb5795/5/file1.csv': Could not open Parquet input source '/tmp/RtmpLbqZs6/file4e4ca14fb5795/5/file1.csv': Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.
# /home/nic2/arrow/cpp/src/arrow/dataset/file_parquet.cc:323  GetReader(source, scan_options). Is this a 'parquet' file?
# /home/nic2/arrow/cpp/src/arrow/dataset/discovery.cc:40  InspectSchemas(std::move(options))
# /home/nic2/arrow/cpp/src/arrow/dataset/discovery.cc:262  Inspect(options.inspect_options)
# ℹ Did you mean to specify a 'format' other than the default (parquet)?
```

Closes #12839 from thisisnic/ARROW-16154_error_trace

Authored-by: Nic Crane <thisisnic@gmail.com>
Signed-off-by: Nic Crane <thisisnic@gmail.com>
@thisisnic
Copy link
Member Author

Great. So that points to where in C++ the validation needs to change. Can you see if this is the same issue as Weston identified, or if it needs to be a separate one? (IIRC the issue he created was about Scanner and this is not using Scanner, but maybe it boils down to the same thing.)

@nealrichardson Sorry, old ticket that I'm just picking up now, but how would I know if this is the same issue that Weston identified? Had a skim through the C++ code that is causing the error but don't really understand it. @westonpace - do you know if it'll likely be the same thing?

@nealrichardson
Copy link
Member

I'm missing something here and I can't see what it is. I see that the ScanNode gets the "augmented fields" added to its schema: https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/scanner.cc#L924-L932 So subsequent nodes should see them in the output_schema and be able to use them.

If I comment out these lines and don't project them away: https://github.com/apache/arrow/blob/master/r/R/query-engine.R#L150-L151 and I just do collect(ds), I see the augmented fields in the result. But with those lines in place, if I do mutate(files = Expression$field_ref("__filename")) and then collect, I get the error that @thisisnic showed above.

I can't rule out that there's an extra Project happening in R that I'm not seeing somewhere that is dropping the columns. I think I need to implement the ExecPlan::ToString bindings in order to help solve this.

@westonpace
Copy link
Member

I don't think it is an extra project. The failure is coming from the two calls to Bind in ExecNode_Scan. The code there is binding both the projection and the filter to the dataset schema. This schema does not include the augmented fields and thus the bind fails. If I remove those two calls to bind then everything works.

Inside the scanner, we check both expressions. If they are not bound we bind them to the augmented dataset schema (which isn't exactly available to the caller and does include the augmented fields).

So I think the right answer here is to just remove those calls to bind. They don't exist on the python equivalent path. Binding expressions seems like an implementation detail anyways and we can probably move away from anyone outside arrow-cpp having to know about the concept.

@thisisnic thisisnic marked this pull request as ready for review July 18, 2022 08:22
Copy link
Contributor

@dragosmg dragosmg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with 1 comment / question

r/tests/testthat/test-dataset.R Show resolved Hide resolved
r/R/dataset.R Outdated Show resolved Hide resolved
r/R/dataset.R Outdated Show resolved Hide resolved
r/tests/testthat/test-dataset.R Show resolved Hide resolved
@thisisnic
Copy link
Member Author

OK, more still needed here than I thought. I've now added it as an NSE func, but get an error when trying to print things (likely as we're looking at the schema, which it's not been added to) - as @nealrichardson said above "we assume that the schema contains all possible valid field refs". Not sure if there's a better way to get this info that uses something else.

library(arrow)
library(dplyr)

tf <- tempfile()
dir.create(tf)
write_dataset(mtcars, tf, partitioning = "cyl")

# this works and returns the dataset with the augmented file correctly added
open_dataset(tf) %>%
  mutate(filename = add_filename()) %>%
  collect()
#>     mpg  disp  hp drat    wt  qsec vs am gear carb cyl
#> 1  18.7 360.0 175 3.15 3.440 17.02  0  0    3    2   8
#> 2  14.3 360.0 245 3.21 3.570 15.84  0  0    3    4   8
#> 3  16.4 275.8 180 3.07 4.070 17.40  0  0    3    3   8
#> 4  17.3 275.8 180 3.07 3.730 17.60  0  0    3    3   8
#> 5  15.2 275.8 180 3.07 3.780 18.00  0  0    3    3   8
#> 6  10.4 472.0 205 2.93 5.250 17.98  0  0    3    4   8
#> 7  10.4 460.0 215 3.00 5.424 17.82  0  0    3    4   8
#> 8  14.7 440.0 230 3.23 5.345 17.42  0  0    3    4   8
#> 9  15.5 318.0 150 2.76 3.520 16.87  0  0    3    2   8
#> 10 15.2 304.0 150 3.15 3.435 17.30  0  0    3    2   8
#> 11 13.3 350.0 245 3.73 3.840 15.41  0  0    3    4   8
#> 12 19.2 400.0 175 3.08 3.845 17.05  0  0    3    2   8
#> 13 15.8 351.0 264 4.22 3.170 14.50  0  1    5    4   8
#> 14 15.0 301.0 335 3.54 3.570 14.60  0  1    5    8   8
#> 15 22.8 108.0  93 3.85 2.320 18.61  1  1    4    1   4
#> 16 24.4 146.7  62 3.69 3.190 20.00  1  0    4    2   4
#> 17 22.8 140.8  95 3.92 3.150 22.90  1  0    4    2   4
#> 18 32.4  78.7  66 4.08 2.200 19.47  1  1    4    1   4
#> 19 30.4  75.7  52 4.93 1.615 18.52  1  1    4    2   4
#> 20 33.9  71.1  65 4.22 1.835 19.90  1  1    4    1   4
#> 21 21.5 120.1  97 3.70 2.465 20.01  1  0    3    1   4
#> 22 27.3  79.0  66 4.08 1.935 18.90  1  1    4    1   4
#> 23 26.0 120.3  91 4.43 2.140 16.70  0  1    5    2   4
#> 24 30.4  95.1 113 3.77 1.513 16.90  1  1    5    2   4
#> 25 21.4 121.0 109 4.11 2.780 18.60  1  1    4    2   4
#> 26 21.0 160.0 110 3.90 2.620 16.46  0  1    4    4   6
#> 27 21.0 160.0 110 3.90 2.875 17.02  0  1    4    4   6
#> 28 21.4 258.0 110 3.08 3.215 19.44  1  0    3    1   6
#> 29 18.1 225.0 105 2.76 3.460 20.22  1  0    3    1   6
#> 30 19.2 167.6 123 3.92 3.440 18.30  1  0    4    4   6
#> 31 17.8 167.6 123 3.92 3.440 18.90  1  0    4    4   6
#> 32 19.7 145.0 175 3.62 2.770 15.50  0  1    5    6   6
#>                                                  filename
#> 1  /tmp/RtmpzzDK4v/file32c352040ed57/cyl=8/part-0.parquet
#> 2  /tmp/RtmpzzDK4v/file32c352040ed57/cyl=8/part-0.parquet
#> 3  /tmp/RtmpzzDK4v/file32c352040ed57/cyl=8/part-0.parquet
#> 4  /tmp/RtmpzzDK4v/file32c352040ed57/cyl=8/part-0.parquet
#> 5  /tmp/RtmpzzDK4v/file32c352040ed57/cyl=8/part-0.parquet
#> 6  /tmp/RtmpzzDK4v/file32c352040ed57/cyl=8/part-0.parquet
#> 7  /tmp/RtmpzzDK4v/file32c352040ed57/cyl=8/part-0.parquet
#> 8  /tmp/RtmpzzDK4v/file32c352040ed57/cyl=8/part-0.parquet
#> 9  /tmp/RtmpzzDK4v/file32c352040ed57/cyl=8/part-0.parquet
#> 10 /tmp/RtmpzzDK4v/file32c352040ed57/cyl=8/part-0.parquet
#> 11 /tmp/RtmpzzDK4v/file32c352040ed57/cyl=8/part-0.parquet
#> 12 /tmp/RtmpzzDK4v/file32c352040ed57/cyl=8/part-0.parquet
#> 13 /tmp/RtmpzzDK4v/file32c352040ed57/cyl=8/part-0.parquet
#> 14 /tmp/RtmpzzDK4v/file32c352040ed57/cyl=8/part-0.parquet
#> 15 /tmp/RtmpzzDK4v/file32c352040ed57/cyl=4/part-0.parquet
#> 16 /tmp/RtmpzzDK4v/file32c352040ed57/cyl=4/part-0.parquet
#> 17 /tmp/RtmpzzDK4v/file32c352040ed57/cyl=4/part-0.parquet
#> 18 /tmp/RtmpzzDK4v/file32c352040ed57/cyl=4/part-0.parquet
#> 19 /tmp/RtmpzzDK4v/file32c352040ed57/cyl=4/part-0.parquet
#> 20 /tmp/RtmpzzDK4v/file32c352040ed57/cyl=4/part-0.parquet
#> 21 /tmp/RtmpzzDK4v/file32c352040ed57/cyl=4/part-0.parquet
#> 22 /tmp/RtmpzzDK4v/file32c352040ed57/cyl=4/part-0.parquet
#> 23 /tmp/RtmpzzDK4v/file32c352040ed57/cyl=4/part-0.parquet
#> 24 /tmp/RtmpzzDK4v/file32c352040ed57/cyl=4/part-0.parquet
#> 25 /tmp/RtmpzzDK4v/file32c352040ed57/cyl=4/part-0.parquet
#> 26 /tmp/RtmpzzDK4v/file32c352040ed57/cyl=6/part-0.parquet
#> 27 /tmp/RtmpzzDK4v/file32c352040ed57/cyl=6/part-0.parquet
#> 28 /tmp/RtmpzzDK4v/file32c352040ed57/cyl=6/part-0.parquet
#> 29 /tmp/RtmpzzDK4v/file32c352040ed57/cyl=6/part-0.parquet
#> 30 /tmp/RtmpzzDK4v/file32c352040ed57/cyl=6/part-0.parquet
#> 31 /tmp/RtmpzzDK4v/file32c352040ed57/cyl=6/part-0.parquet
#> 32 /tmp/RtmpzzDK4v/file32c352040ed57/cyl=6/part-0.parquet

# this doesn't - as it tries to print it
open_dataset(tf) %>%
  mutate(filename = add_filename())
#> Error in schm$GetFieldByName(name)$type$ToString(): attempt to apply non-function

# if we try it on a table, we get an error message - 
# I can look to catch this and raise an error with more context
arrow_table(mtcars) %>% mutate(filename = add_filename()) %>% collect()
#> Error in `collect()`:
#> ! Invalid: No match for FieldRef.Name(__filename) in mpg: double
#> cyl: double
#> disp: double
#> hp: double
#> drat: double
#> wt: double
#> qsec: double
#> vs: double
#> am: double
#> gear: double
#> carb: double
#> /home/nic2/arrow/cpp/src/arrow/type.h:1800  CheckNonEmpty(matches, root)
#> /home/nic2/arrow/cpp/src/arrow/compute/exec/expression.cc:429  ref->FindOne(in)
#> /home/nic2/arrow/cpp/src/arrow/compute/exec/project_node.cc:67  expr.Bind(*inputs[0]->output_schema(), plan->exec_context())

# same error as with the dataset - "attempt to apply non-function"
arrow_table(mtcars) %>% mutate(filename = add_filename())
#> Error in schm$GetFieldByName(name)$type$ToString(): attempt to apply non-function

@nealrichardson
Copy link
Member

Re: the print method, you probably need more special casing here. The other place that may make assumptions about FieldRefs is implicit_schema(), which is called when a query is collapsed. So you may want to add some tests that do aggregation, or joins, or head/tail, etc., as these are all cases that would involve implicit_schema (iirc).

Re: handling for non-Datasets, ideally you'd catch that when add_filename() is called. You should be able to find .data somewhere in the env stack and inspect it.

@thisisnic
Copy link
Member Author

Re: handling for non-Datasets, ideally you'd catch that when add_filename() is called. You should be able to find .data somewhere in the env stack and inspect it.

@nealrichardson I'm a bit lost as to how I'd be expecting .data to look different between a Table and a Dataset. I've managed to access it via caller_env()$.data but the contents look the same either way - a named list of Expressions.

@nealrichardson
Copy link
Member

Re: handling for non-Datasets, ideally you'd catch that when add_filename() is called. You should be able to find .data somewhere in the env stack and inspect it.

@nealrichardson I'm a bit lost as to how I'd be expecting .data to look different between a Table and a Dataset. I've managed to access it via caller_env()$.data but the contents look the same either way - a named list of Expressions.

Not that .data, the .data that that is generated from in arrow_mask() I was thinking along the lines of what I proposed in https://issues.apache.org/jira/browse/ARROW-13186, which is (as you can see) not implemented. One idea would be to poke a magic variable into the data mask that says "I am a dataset", or attach an attribute to that .data to that effect, and you could find it here. But we can defer that--maybe make a TODO JIRA for that?

@thisisnic thisisnic force-pushed the ARROW-15260_filenames branch 3 times, most recently from c3b5774 to df817b2 Compare July 26, 2022 20:04
@thisisnic
Copy link
Member Author

This still isn't done as the tests I added don't hit the implicit_schema() path.

@thisisnic
Copy link
Member Author

Re: handling for non-Datasets, ideally you'd catch that when add_filename() is called. You should be able to find .data somewhere in the env stack and

I've opened ARROW-17356 as a follow-up.

Copy link
Member

@nealrichardson nealrichardson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A suggestion about the error message but otherwise LGTM, thanks for persisting with this!


# this hits the implicit_schema path by joining afterwards
join_after <- ds %>%
mutate(file = add_filename()) %>%
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation here and on the next example are off

r/R/util.R Outdated
Comment on lines 242 to 244
"Augmented fields such as 'filename' must",
"only be used with with Dataset objects which have",
"not been aggregated or joined."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wordsmithing here, how about something like "'filename' can only be used with Dataset objects, and it can only be added before doing an aggregation or a join"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a lot clearer, thank you! I've updated the error to incorporate both that, and the two different ways of referring to the __filename variable.

@nealrichardson nealrichardson merged commit 8386871 into apache:master Aug 10, 2022
@ursabot
Copy link

ursabot commented Aug 10, 2022

Benchmark runs are scheduled for baseline = b3116fa and contender = 8386871. 8386871 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Failed ⬇️2.28% ⬆️0.03%] test-mac-arm
[Failed ⬇️25.54% ⬆️0.0%] ursa-i9-9960x
[Finished ⬇️0.21% ⬆️0.11%] ursa-thinkcentre-m75q
Buildkite builds:
[Finished] 83868717 ec2-t3-xlarge-us-east-2
[Failed] 83868717 test-mac-arm
[Failed] 83868717 ursa-i9-9960x
[Finished] 83868717 ursa-thinkcentre-m75q
[Finished] b3116fa3 ec2-t3-xlarge-us-east-2
[Finished] b3116fa3 test-mac-arm
[Finished] b3116fa3 ursa-i9-9960x
[Finished] b3116fa3 ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@ursabot
Copy link

ursabot commented Aug 10, 2022

['Python', 'R'] benchmarks have high level of regressions.
test-mac-arm
ursa-i9-9960x

@vspinu
Copy link

vspinu commented Aug 29, 2022

After this commit I see a 10X memory usage increase and 5x slower compute times in my application. Instead of 400MB I am hitting 4GB now. 🙄

Shouldn't this feature have no effect whatsoever unless add_filename() is explicitly requested?

@nealrichardson
Copy link
Member

The change triggered ARROW-17556, will get fixed there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants