Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-8376: [R] Add experimental interface to ScanTask/RecordBatch iterators #6365

Closed
wants to merge 7 commits into from

Conversation

nealrichardson
Copy link
Member

As an alternative to calling ToTable() to bring everything into memory, it would be nice to expose the stream of batches so that you could aggregate (or really do whatever) on each chunk. That gives access to the full dataset, which otherwise you can't handle unless it's small.

On the NYC taxi dataset (10.5 years, 125 parquet files),

tab <- ds %>%
  select(passenger_count) %>%
  map_batches(~count(., passenger_count)) %>%
  group_by(passenger_count) %>%
  summarize(n = sum(n))

gives me the tabulation of passenger_count in about 200s (no parallelization). And you can see all sorts of weird features in the data:

> as.data.frame(tab)
   passenger_count          n
1             -127          7
2             -123          1
3             -122          1
4             -119          1
5             -115          1
6             -101          1
7              -98          1
8              -96          1
9              -93          1
10             -92          1
11             -91          1
12             -79          1
13             -64          2
14             -63          1
15             -48       1508
16             -45          1
17             -43          4
18             -33          1
19             -31          1
20              -9          1
21              -7          1
22              -6          3
23              -2          1
24              -1         10
25               0    5809809
26               1 1078624900
27               2  227454966
28               3   67096194
29               4   32443710
30               5   99064441
31               6   37241244
32               7       1753
33               8       1437
34               9       1304
35              10         17
36              13          1
37              15          2
38              17          1
39              19          1
40              25          1
41              33          2
42              34          1
43              36          1
44              37          1
45              38          1
46              47          1
47              49         26
48              53          1
49              58          2
50              61          1
51              65          3
52              66          1
53              69          1
54              70          1
55              84          1
56              96          1
57              97          1
58             113          1
59             125          1

@github-actions
Copy link

github-actions bot commented Feb 5, 2020

Thanks for opening a pull request!

Could you open an issue for this pull request on JIRA?
https://issues.apache.org/jira/browse/ARROW

Then could you also rename pull request title in the following format?

ARROW-${JIRA_ID}: [${COMPONENT}] ${SUMMARY}

See also:

@fsaintjacques
Copy link
Contributor

The negative numbers is because the proper type is uint8_t, the negative number are because the passenger count is greater than 127. I doubt that this column (or any) are reliable.

@nealrichardson nealrichardson marked this pull request as ready for review April 8, 2020 21:00
@nealrichardson nealrichardson changed the title WIP: expose an interface to ScanTask/RecordBatch iterators in R ARROW-8376: [R] Add experimental interface to ScanTask/RecordBatch iterators Apr 8, 2020
@nealrichardson
Copy link
Member Author

Any objection to merging this @fsaintjacques ? I don't plan on advocating its use, but thought it might be useful to have in the package for experimenting and exploring things.

#' * `projection`: A character vector of column names to select
#' * `filter`: A `Expression` to filter the scanned rows by, or `TRUE` (default)
#' to keep all rows.
#' * `use_threads`: logical: should scanning use multithreading? Default `TRUE`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use_threads default to option(arrow.use_threads) for consistency and other API?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps so, though at least these threads should be safer because they're in the C++ library and not the R bindings. I can make this change in my current PR though.

auto it = VALUE_OR_STOP(scanner->Scan());
std::vector<std::shared_ptr<ds::ScanTask>> out;
std::shared_ptr<ds::ScanTask> scan_task;
// TODO(npr): can this iteration be parallelized?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can, but it's a hazard, e.g. each ScanTask can be attached to an open file descriptor, so you may bust limits if you collect them before aggregating them. That's why you want to consume them immediately, because you control the number of resource in-flight.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants