Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R] open_dataset - add file_name as column #30754

Closed
asfimport opened this issue Jan 5, 2022 · 14 comments
Closed

[R] open_dataset - add file_name as column #30754

asfimport opened this issue Jan 5, 2022 · 14 comments

Comments

@asfimport
Copy link
Collaborator

asfimport commented Jan 5, 2022

Hi. Is it possible to add the file_name as a column to a dataset?

ds <- open_dataset(.....)
list_of_files <- ds$files

This works, but I need the file_name as a column.
Thanks

 

Reporter: Martin du Toit / @martindut
Assignee: Nicola Crane / @thisisnic

Related issues:

PRs and other links:

Note: This issue was originally created as ARROW-15260. Please see the migration documentation for further details.

@asfimport
Copy link
Collaborator Author

Martin du Toit / @martindut:
I already add the folders as partitions, but also need the file_name as a column, to group_by and add row_numbers

@asfimport
Copy link
Collaborator Author

Nicola Crane / @thisisnic:
I'm don't think that that is possible, but could you give me a bit more information about the problem you're trying to solve, or why you need it, as perhaps there may be alternative ways of achieving your goals?

@asfimport
Copy link
Collaborator Author

Martin du Toit / @martindut:
The raw data that we receive from clients is structured into various folders. We partition the data based on the folders. The lowest level is a timestamp folder, but there are scenarios where we receive multiple files for a specific timestamp. In order to process the data, in bulk, we need to create a unique row level id for for each file, i.e. group_by various partitions and file_name to add a row_number . If we pickup any issues with the data, we need to be able to pinpoint the exact file where the issue occurred to revert back to the client.

I hope this makes sense

@asfimport
Copy link
Collaborator Author

Nicola Crane / @thisisnic:
Yeah, that does make sense. As a short-term (and not particularly scalable) solution, I suppose you could do something like:

files <- list.files(directory, recursive = TRUE, full.names = TRUE)

for(file in files){
  data <- read_csv_arrow(file)
  data <- mutate(data, filename = file)
  write_csv_arrow(data, file = file)
}

I also wonder if the code required to solve ARROW-14612 might bring us closer to this being possible. Would be good to hear others' thoughts here.

@asfimport
Copy link
Collaborator Author

Weston Pace / @westonpace:
From a C++ perspective we've got many of the pieces needed already. One challenge is that the datasets API is written to work with "fragments" and not "files". For example, a dataset might be an in-memory table in which case we are working with InMemoryFragment and not FileFragment so there is no concept of "filename".

That being said, the low level ScanBatchesAsync method actually returns a generator of TaggedRecordBatch for this very purpose. A TaggedRecordBatch is a struct with the record batch as well as the source fragment for that record batch.

So if you were to execute scan, you could inspect the fragment and, if it is a FileFragment, you could extract the filename.

Another challenge is that R is moving towards more and more access through an exec plan and not directly using a scanner. In order for that to work we would need to augment the scan results with the filename in C++ before sending into the exec plan. Luckily, we already do this a bit as well. We currently augment the scan results with fragment index, batch index, and whether the batch is the last batch in the fragment.

Since ExecBatch can work with constants efficiently I don't think there will be much performance cost in always including the filename. So the work remaining is simply to add a new augmented field __fragment_source_name which is always attached if the underlying fragment is a filename. Then users can get this field if they want by including "__fragment_source_name" in the list of columns they query for.

@asfimport
Copy link
Collaborator Author

Martin du Toit / @martindut:
Thanks for the feedback @westonpace , although I must admit, it is a little bit Greek to me. Will it be possible for someone to give me an example of how to do this FileFragment scan in R? @thisisnic  

@asfimport
Copy link
Collaborator Author

Nicola Crane / @thisisnic:
Hi @martindut - I think this is C++ functionality which is not exposed directly in the R layer and that this wouldn't be possible.  I will open a ticket for enabling the functionality that Weston mentions for extracting filenames though.

@asfimport
Copy link
Collaborator Author

Nicola Crane / @thisisnic:
This is now implemented in the C++ via ARROW-15281; however, it'll need some work to bring this functionality into the R code, and could be a bit tricky. Here are my notes on what I think the tricky bits will be.

In Python, we can do something like scanner = dataset_reader.scanner(dataset, columns=['__filename'])
In the R code, the Scanner object looks like the likely analog. We use Scanner$create() to create a Scanner object, and it uses the projection field to specify columns.

In the body of Scanner$create(), we have this code: proj <- c(dataset$selected_columns, dataset$temp_columns) and then later stopifnot("attempting to project with unknown columns" = all(projection %in% names(proj)))

So we'll need to make some sort of change so that we can select this "metadata" kind of column.

It may be complicated further by the fact that this deviates a bit from the usual way of using dplyr::select(); i.e. if I set up a basic dataset based on the mtcars dataset and try to call select(mtcars_dataset, cyl, __filenames), I get Error: unexpected input in "select(my_dataset, cyl, _" which is different from the usual Can't subset columns that don't exist. error message I might expect, so there may be something around the syntax here too.

@asfimport
Copy link
Collaborator Author

Dewey Dunnington / @paleolimbot:
(JIRA does wild things with italics here, so I'm sticking it in noformat...)



It looks like we can access {{__filename}} from the {{Scanner}} too, 
although it's pretty limited what we do with it. Note that in R you will 
have to use backticks in something like dplyr (e.g., {{`__filename`}}, 
because variables in R can't start with {{_}}. In the dplyr interface 
we make a pretty strong assumption that the schema names are the
 available names in the dataset...maybe the best way would be to add
 a binding like {{dataset_filename()}} that inserts the correct field
 reference (although C++ gives us errors if we try to insert a field 
reference to {{__filename}} in an {{Expression}}).

library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

tf <- tempfile()
write_dataset(mtcars, tf, partitioning = "cyl")
ds <- open_dataset(tf)

# works!
scanner <- Scanner$create(
  open_dataset(tf), 
  projection = c("__filename", names(ds))
)

as_tibble(scanner$ToTable())
#> # A tibble: 32 × 12
#>    `__filename`        mpg  disp    hp  drat    wt  qsec    vs    am  gear  carb
#>    <chr>             <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
#>  1 /private/var/fol…  22.8 108      93  3.85  2.32  18.6     1     1     4     1
#>  2 /private/var/fol…  24.4 147.     62  3.69  3.19  20       1     0     4     2
#>  3 /private/var/fol…  22.8 141.     95  3.92  3.15  22.9     1     0     4     2
#>  4 /private/var/fol…  32.4  78.7    66  4.08  2.2   19.5     1     1     4     1
#>  5 /private/var/fol…  30.4  75.7    52  4.93  1.62  18.5     1     1     4     2
#>  6 /private/var/fol…  33.9  71.1    65  4.22  1.84  19.9     1     1     4     1
#>  7 /private/var/fol…  21.5 120.     97  3.7   2.46  20.0     1     0     3     1
#>  8 /private/var/fol…  27.3  79      66  4.08  1.94  18.9     1     1     4     1
#>  9 /private/var/fol…  26   120.     91  4.43  2.14  16.7     0     1     5     2
#> 10 /private/var/fol…  30.4  95.1   113  3.77  1.51  16.9     1     1     5     2
#> # … with 22 more rows, and 1 more variable: cyl <int>

# seems that we still can't use __filename in a filter expr
Scanner$create(
  open_dataset(tf),
  projection = c("__filename", names(ds)),
  filter = Expression$create(
    "match_substring",
    Expression$field_ref("__filename"),
    options = list(pattern = "cyl=8")
  )
)
#> Error: Invalid: No match for FieldRef.Name(__filename) in mpg: double
#> disp: double
#> hp: double
#> drat: double
#> wt: double
#> qsec: double
#> vs: double
#> am: double
#> gear: double
#> carb: double
#> cyl: int32
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/type.h:1717  CheckNonEmpty(matches, root)
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/dataset/scanner.cc:782  ref.FindOne(*scan_options_->dataset_schema)

@asfimport
Copy link
Collaborator Author

Nicola Crane / @thisisnic:
Oh sweet, at least we're closer than I'd thought we were, and works for the basic use-case here (i.e. find out which file this row belongs to).

@asfimport
Copy link
Collaborator Author

Neal Richardson / @nealrichardson:

although C++ gives us errors if we try to insert a field reference to __filename in an Expression

This is tricky because we (and apparently elsewhere in the C++ code) have logic to filter out secret internal columns like this: https://github.com/apache/arrow/blob/master/r/R/query-engine.R#L159-L163. Sounds like we need to find a safe way to loosen that, or otherwise rethink the implementation.

In terms of UX in R, a special helper like add_filenames <- function() Expression$field_ref("__filename") that you could call like mutate(ds, file_col = add_filenames()) might be a reasonable interface to this.

@asfimport
Copy link
Collaborator Author

Weston Pace / @westonpace:

seems that we still can't use __filename in a filter expr

That's a C++ problem (just filed ARROW-16115)

This is tricky because we (and apparently elsewhere in the C++ code) have logic to filter out secret internal columns like this

In C++ if the user doesn't specify any projection we default to "all columns but not augmented columns". I think that's the only time we filter out these special columns and I think we want to keep this interpretation.

@asfimport
Copy link
Collaborator Author

Weston Pace / @westonpace:
Oh, also, one might think that this query would push the filter down:


  filter = Expression$create(
    "match_substring",
    Expression$field_ref("__filename"),
    options = list(pattern = "cyl=8")
  )

In other words, you might think we would get the hint and only read files matching that pattern. This is not the case. We will read the entire dataset and apply the "cyl=8" filter in memory.

If we want to pushdown filters on the filename column we will need to add some special logic. Feel free to create a JIRA.

@asfimport
Copy link
Collaborator Author

Neal Richardson / @nealrichardson:
Issue resolved by pull request 12826
#12826

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants