Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R] How to filter array columns? #31991

Closed
asfimport opened this issue May 24, 2022 · 6 comments
Closed

[R] How to filter array columns? #31991

asfimport opened this issue May 24, 2022 · 6 comments

Comments

@asfimport
Copy link
Collaborator

asfimport commented May 24, 2022

In the parquet data we have, there is a column with the array data type ({}list<array_element >{}), which flags records that have different issues. For each record, multiple values could be stored in the column. For example, {_}[A, B, C]{_}.

I'm trying to perform a data filtering step and exclude some flagged records.

Filtering is trivial for the regular columns that contain just a single value. E.g.,

flags_to_exclude <- c("A", "B")
datt %>% filter(! col %in% flags_to_exclude)

Given the array column, is it possible to exclude records with at least one of the flags from flags_to_exclude using the arrow R package?

I really appreciate any advice you can provide!

Reporter: Will Jones / @wjones127
Assignee: Will Jones / @wjones127

Related issues:

Note: This issue was originally created as ARROW-16641. Please see the migration documentation for further details.

@asfimport
Copy link
Collaborator Author

Will Jones / @wjones127:
I don't think there's a compute function that does what you want directly, but you should be able to achieve this by flattening the list, doing the filter, and aggregating on the indices to get a filter vector. 

Is the following example helpful?

library(arrow)
library(dplyr)

# Filter `tab` for and `tab$x` in `valid`
valid <- Array$create(c(2))

tab <- arrow_table(
  x = Array$create(list(c(1, 2), c(3, 2), c(1, 3))),
  y = Array$create(c("a", "b", "c"))
)

tab_exploded <- arrow_table(
  i = call_function("list_parent_indices", tab$x),
  x_flat = call_function("list_flatten", tab$x)
)

to_keep <- tab_exploded %>%
  group_by(i) %>%
  summarise(keep = any(x_flat %in% valid)) %>%
  compute() %>%
  .$keep

res <- tab[to_keep,]
as_tibble(res)
#> # A tibble: 2 × 2
#>                x y    
#>   <list<double>> <chr>
#> 1            [2] a    
#> 2            [2] b
res$x
#> ChunkedArray
#> [
#>   [
#>     [
#>       1,
#>       2
#>     ],
#>     [
#>       3,
#>       2
#>     ]
#>   ]
#> ]

@asfimport
Copy link
Collaborator Author

Will Jones / @wjones127:
Oh correction: to flatten a chunked array, you should use 

  x_flat = call_function("list_flatten", tab$x)

@asfimport
Copy link
Collaborator Author

Vladimir:
Thank you very much for the comprehensive answer! It's very helpful!

As I understand, list_parent_indices discards records without any flags ({}NULL{} values). Is it possible to keep the nulls? I've tried to specify the options ({}skip_nulls = "false" {}or {}skip_nulls = FALSE{}), but it doesn't have any effect.

 

tab <- arrow_table(
  x = Array$create(list(c(1, 2), 1, NULL)),
  y = Array$create(c("a", "b", "c"))
)
tab_exploded <- arrow_table(
  i = call_function("list_parent_indices", tab$x, options = list(skip_nulls = "false")),
  x_flat = call_function("list_flatten", tab$x)
)

 

 

@asfimport
Copy link
Collaborator Author

Will Jones / @wjones127:
Yes that option doesn't seem to do anything here. I think the semantics are a little tough to design for that. list_parent_indices returns a value per value in the subarray; if there are zero values, should there not be zero indices?

It does make this set filtering more awkward though; this probably deserves it's own function.

A different approach might be constructing the indices to exclude:

library(arrow)
library(dplyr)

# Filter `tab` for any `tab$x` in that doesn't contain value in `exclude`
exclude <- Array$create(c(1))

tab <- arrow_table(
  x = Array$create(list(c(1, 2), c(3, 2), c(), c(1, 3))),
  y = Array$create(c("a", "b", "c", "d"))
)

tab_exploded <- arrow_table(
  i = call_function("list_parent_indices", tab$x),
  x_flat = call_function("list_flatten", tab$x)
  #x_flat = tab$x$chunk(0)$values()
)

to_drop <- tab_exploded %>%
  group_by(i) %>%
  summarise(to_drop = any((x_flat %in% exclude))) %>%
  filter(to_drop) %>%
  compute() %>%
  .$i

selection <- !(1:nrow(tab) %in% as.vector(to_drop + 1))
res <- tab[selection,]
as_tibble(res)
#> # A tibble: 2 × 2
#>                x y    
#>   <list<double>> <chr>
#> 1            [2] b    
#> 2                c
res$x
#> ChunkedArray
#> [
#>   [
#>     [
#>       3,
#>       2
#>     ],
#>     null
#>   ]
#> ]

@asfimport
Copy link
Collaborator Author

Vladimir:
Will, thank you very much for your support!
The solution you proposed works very well.

However, there is an issue applying it to a real dataset. It looks like the {}call_function("list_parent_indices"){`}` does not work with a {}FileSystemDataset{} (parquet files opened with ``{}arrow::open_dataset(){}). So to prepare an "exploded table", we need to load the full dataset into memory. And the "exploded table" becomes huge if there are multiple flags per record. So I'm running out of RAM very quickly (>100 GB).

I've also tried the other way to filter the records - using DuckDB+arrow with  SQL query of form "{}SELECT * FROM my_table WHERE 'FlagA' != ANY (my_table.issue) AND 'FlagB' != ANY (my_table.issue){}". But this approach is also very RAM-consuming.

So probably currently, there is no simple way to perform this filtering in one pass, and we need to split the data into chunks.

It would be pretty cool if one day arrow would have a built-in function to perform such a task on the fly!

PS. The dataset we are working on is in open access - it's GBIF occurrence records. It has almost 2 billion records, and some of the records could have ~10 flags (column issue).

 

@asfimport
Copy link
Collaborator Author

Will Jones / @wjones127:

It would be pretty cool if one day arrow would have a built-in function to perform such a task on the fly!
Yes, I agree. I believe what you are looking for is the arrays_overlap() function in Spark or the && operator in PostgreSQL, so I've created a new ticket to implement that: https://issues.apache.org/jira/browse/ARROW-16702
So to prepare an "exploded table", we need to load the full dataset into memory.
I'd also add that for cases like this, map_batches() will one day be a good solution. Right now it collects its results fully into memory, but we should eventually let it stream out the results. For now, it does process the inputs one-by-one, so it works well for reducing/aggregating.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants