Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R] Update function examples to show how to open CSV dataset with partitioning and a schema #32938

Open
Tracked by #33520
asfimport opened this issue Sep 13, 2022 · 8 comments

Comments

@asfimport
Copy link

I feel like this might be a duplicate of a previous ticket, but can't find it.

library(dplyr)
library(arrow)

# all good!
tf <- tempfile()
dir.create(tf)
write_dataset(mtcars, tf, format = "csv")
open_dataset(tf, format = "csv") %>% collect()
#> # A tibble: 32 × 11
#>      mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
#>    <dbl> <int> <dbl> <int> <dbl> <dbl> <dbl> <int> <int> <int> <int>
#>  1  21       6  160    110  3.9   2.62  16.5     0     1     4     4
#>  2  21       6  160    110  3.9   2.88  17.0     0     1     4     4
#>  3  22.8     4  108     93  3.85  2.32  18.6     1     1     4     1
#>  4  21.4     6  258    110  3.08  3.22  19.4     1     0     3     1
#>  5  18.7     8  360    175  3.15  3.44  17.0     0     0     3     2
#>  6  18.1     6  225    105  2.76  3.46  20.2     1     0     3     1
#>  7  14.3     8  360    245  3.21  3.57  15.8     0     0     3     4
#>  8  24.4     4  147.    62  3.69  3.19  20       1     0     4     2
#>  9  22.8     4  141.    95  3.92  3.15  22.9     1     0     4     2
#> 10  19.2     6  168.   123  3.92  3.44  18.3     1     0     4     4
#> # … with 22 more rows

# all good
tf <- tempfile()
dir.create(tf)
write_dataset(group_by(mtcars, cyl), tf, format = "csv")
open_dataset(tf, format = "csv") %>% collect()
#> # A tibble: 32 × 11
#>      mpg  disp    hp  drat    wt  qsec    vs    am  gear  carb   cyl
#>    <dbl> <dbl> <int> <dbl> <dbl> <dbl> <int> <int> <int> <int> <int>
#>  1  22.8 108      93  3.85  2.32  18.6     1     1     4     1     4
#>  2  24.4 147.     62  3.69  3.19  20       1     0     4     2     4
#>  3  22.8 141.     95  3.92  3.15  22.9     1     0     4     2     4
#>  4  32.4  78.7    66  4.08  2.2   19.5     1     1     4     1     4
#>  5  30.4  75.7    52  4.93  1.62  18.5     1     1     4     2     4
#>  6  33.9  71.1    65  4.22  1.84  19.9     1     1     4     1     4
#>  7  21.5 120.     97  3.7   2.46  20.0     1     0     3     1     4
#>  8  27.3  79      66  4.08  1.94  18.9     1     1     4     1     4
#>  9  26   120.     91  4.43  2.14  16.7     0     1     5     2     4
#> 10  30.4  95.1   113  3.77  1.51  16.9     1     1     5     2     4
#> # … with 22 more rows
list.files(tf)
#> [1] "cyl=4" "cyl=6" "cyl=8"

# hive-style=FALSE leads to no `cyl` column, which, sure, makes sense
tf <- tempfile()
dir.create(tf)
write_dataset(group_by(mtcars, cyl), tf, format = "csv", hive_style = FALSE)
open_dataset(tf, format = "csv") %>% collect()
#> # A tibble: 32 × 10
#>      mpg  disp    hp  drat    wt  qsec    vs    am  gear  carb
#>    <dbl> <dbl> <int> <dbl> <dbl> <dbl> <int> <int> <int> <int>
#>  1  22.8 108      93  3.85  2.32  18.6     1     1     4     1
#>  2  24.4 147.     62  3.69  3.19  20       1     0     4     2
#>  3  22.8 141.     95  3.92  3.15  22.9     1     0     4     2
#>  4  32.4  78.7    66  4.08  2.2   19.5     1     1     4     1
#>  5  30.4  75.7    52  4.93  1.62  18.5     1     1     4     2
#>  6  33.9  71.1    65  4.22  1.84  19.9     1     1     4     1
#>  7  21.5 120.     97  3.7   2.46  20.0     1     0     3     1
#>  8  27.3  79      66  4.08  1.94  18.9     1     1     4     1
#>  9  26   120.     91  4.43  2.14  16.7     0     1     5     2
#> 10  30.4  95.1   113  3.77  1.51  16.9     1     1     5     2
#> # … with 22 more rows
list.files(tf)
#> [1] "4" "6" "8"


# *but* if we try to add it in via a schema, it doesn't work

desired_schema <- schema(mpg = float64(), disp = float64(), hp = int64(), drat = float64(), 
    wt = float64(), qsec = float64(), vs = int64(), am = int64(), 
    gear = int64(), carb = int64(), cyl = int64())

tf <- tempfile()
dir.create(tf)
write_dataset(group_by(mtcars, cyl), tf, format = "csv", hive_style = FALSE)
open_dataset(tf, format = "csv", schema = desired_schema) %>% collect()
#> Error in `dplyr::collect()`:
#> ! Invalid: Could not open CSV input source '/tmp/RtmpnInOwc/file13f0d38c5b994/4/part-0.csv': Invalid: CSV parse error: Row #1: Expected 11 columns, got 10: "mpg","disp","hp","drat","wt","qsec","vs","am","gear","carb"
#> /home/nic2/arrow/cpp/src/arrow/csv/parser.cc:477  (ParseLine<SpecializedOptions, false>(values_writer, parsed_writer, data, data_end, is_final, &line_end, bulk_filter))
#> /home/nic2/arrow/cpp/src/arrow/csv/parser.cc:566  ParseChunk<SpecializedOptions>( &values_writer, &parsed_writer, data, data_end, is_final, rows_in_chunk, &data, &finished_parsing, bulk_filter)
#> /home/nic2/arrow/cpp/src/arrow/csv/reader.cc:426  parser->ParseFinal(views, &parsed_size)
#> /home/nic2/arrow/cpp/src/arrow/compute/exec/exec_plan.cc:573  iterator_.Next()
#> /home/nic2/arrow/cpp/src/arrow/record_batch.cc:337  ReadNext(&batch)
#> /home/nic2/arrow/cpp/src/arrow/record_batch.cc:351  ToRecordBatches()
list.files(tf)
#> [1] "4" "6" "8"

If I include a schema which does not include the partitioning column, it works, but I lost the partitioning column

# this works minus the col
desired_schema <- schema(mpg = float64(), disp = float64(), hp = int64(), drat = float64(), 
    wt = float64(), qsec = float64(), vs = int64(), am = int64(), 
    gear = int64(), carb = int64())

tf <- tempfile()
dir.create(tf)
write_dataset(group_by(mtcars, cyl), tf, format = "csv", hive_style = FALSE)
open_dataset(tf, format = "csv", schema = desired_schema, skip = 1) %>% collect()

Reporter: Nicola Crane / @thisisnic

Note: This issue was originally created as ARROW-17700. Please see the migration documentation for further details.

@asfimport
Copy link
Author

Nicola Crane / @thisisnic:
This is a duplicate of ARROW-14743, which was previously resolved.

@asfimport
Copy link
Author

Nicola Crane / @thisisnic:
Do y'all mind not looking into this too much right now, as I'm wanting to level up my C++ and trying to work it out on my own!

@asfimport
Copy link
Author

Neal Richardson / @nealrichardson:
Also sounds like ARROW-15879

@asfimport
Copy link
Author

Nicola Crane / @thisisnic:
Right, totally stuck here. It looks like there is a variable batch_.num_cols_ in arrow/cpp/src/arrow/csv/parser.cc which is set to the number of fields in the schema which is why it's complaining about being too high (as the data being read in is partitioned, so has 1 less field than expected), but I'm struggling to think of how I'd work out if this is something in the C++ or something in the stuff we pass to the C++ from R that should set this.

@asfimport
Copy link
Author

Nicola Crane / @thisisnic:
CC @westonpace

@asfimport
Copy link
Author

Nicola Crane / @thisisnic:
I've been looking in the parser.cc file mentioned in the error message, and I think what it is is that it's using the number of fields in the schema to set some variable, batch_.num_cols_, instead of however that variable is set when we don't manually pass a schema in. As the schema contains the partitioning column, and that variable is used to work out how much data to read, the Expected 11 columns, got 10 error is triggered.

@asfimport
Copy link
Author

Nicola Crane / @thisisnic:
FWIW in case anyyone else comes up against this issue, after some help from @westonpace, we worked out to read in the partitioning column successfully, you have to pass in the column as a schema via the partitioning argument:

tf <- tempfile()
dir.create(tf)
write_dataset(mtcars, tf, partitioning = "cyl", hive_style = FALSE)
open_dataset(tf, partitioning = schema(cyl = int64())) %>% collect()
#>     mpg  disp  hp drat    wt  qsec vs am gear carb cyl
#> 1  22.8 108.0  93 3.85 2.320 18.61  1  1    4    1   4
#> 2  24.4 146.7  62 3.69 3.190 20.00  1  0    4    2   4
#> 3  22.8 140.8  95 3.92 3.150 22.90  1  0    4    2   4
...

@asfimport
Copy link
Author

Nicola Crane / @thisisnic:
We should add an example to the docs for open_dataset() which shows the correct way to do this when passing a schema to both the schema and partitioning arguments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant