Skip to content

Commit

Permalink
ARROW-9219: [R] coerce_timestamps in Parquet write options does not work
Browse files Browse the repository at this point in the history
In addition to fixing the bug (a quick fix), I also spent some time deleting unnecessary bindings for parquet writer builder methods. There's more that can be done, which I think would shave more off of the built library size, but I don't have time to do it right now.

Closes #7550 from nealrichardson/r-fix-parquet

Authored-by: Neal Richardson <neal.p.richardson@gmail.com>
Signed-off-by: Neal Richardson <neal.p.richardson@gmail.com>
  • Loading branch information
nealrichardson committed Jun 26, 2020
1 parent b2c77ca commit 36b5a96
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 466 deletions.
52 changes: 2 additions & 50 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

178 changes: 75 additions & 103 deletions r/R/parquet.R
Expand Up @@ -174,51 +174,25 @@ write_parquet <- function(x,
invisible(x_out)
}


ParquetArrowWriterPropertiesBuilder <- R6Class("ParquetArrowWriterPropertiesBuilder", inherit = ArrowObject,
public = list(
store_schema = function() {
parquet___ArrowWriterProperties___Builder__store_schema(self)
self
},
set_int96_support = function(use_deprecated_int96_timestamps = FALSE) {
if (use_deprecated_int96_timestamps) {
parquet___ArrowWriterProperties___Builder__enable_deprecated_int96_timestamps(self)
} else {
parquet___ArrowWriterProperties___Builder__disable_deprecated_int96_timestamps(self)
}
self
},
set_coerce_timestamps = function(coerce_timestamps = NULL) {
if (!is.null(coerce_timestamps)) {
unit <- make_valid_time_unit(coerce_timestamps,
c("ms" = TimeUnit$MILLI, "us" = TimeUnit$MICRO)
)
parquet___ArrowWriterProperties___Builder__coerce_timestamps(unit)
}
self
},
set_allow_truncated_timestamps = function(allow_truncated_timestamps = FALSE) {
if (allow_truncated_timestamps) {
parquet___ArrowWriterProperties___Builder__allow_truncated_timestamps(self)
} else {
parquet___ArrowWriterProperties___Builder__disallow_truncated_timestamps(self)
}

self
}

)
)
ParquetArrowWriterProperties <- R6Class("ParquetArrowWriterProperties", inherit = ArrowObject)

ParquetArrowWriterProperties$create <- function(use_deprecated_int96_timestamps = FALSE, coerce_timestamps = NULL, allow_truncated_timestamps = FALSE) {
builder <- shared_ptr(ParquetArrowWriterPropertiesBuilder, parquet___ArrowWriterProperties___Builder__create())
builder$store_schema()
builder$set_int96_support(use_deprecated_int96_timestamps)
builder$set_coerce_timestamps(coerce_timestamps)
builder$set_allow_truncated_timestamps(allow_truncated_timestamps)
shared_ptr(ParquetArrowWriterProperties, parquet___ArrowWriterProperties___Builder__build(builder))
ParquetArrowWriterProperties$create <- function(use_deprecated_int96_timestamps = FALSE,
coerce_timestamps = NULL,
allow_truncated_timestamps = FALSE) {
if (is.null(coerce_timestamps)) {
timestamp_unit <- -1L # null sentinel value
} else {
timestamp_unit <- make_valid_time_unit(coerce_timestamps,
c("ms" = TimeUnit$MILLI, "us" = TimeUnit$MICRO)
)
}
shared_ptr(
ParquetArrowWriterProperties,
parquet___ArrowWriterProperties___create(
use_deprecated_int96_timestamps = isTRUE(use_deprecated_int96_timestamps),
timestamp_unit = timestamp_unit,
allow_truncated_timestamps = isTRUE(allow_truncated_timestamps)
)
)
}

valid_parquet_version <- c(
Expand Down Expand Up @@ -285,86 +259,89 @@ ParquetWriterPropertiesBuilder <- R6Class("ParquetWriterPropertiesBuilder", inhe
set_version = function(version) {
parquet___ArrowWriterProperties___Builder__version(self, make_valid_version(version))
},

set_compression = function(table, compression){
private$.set(table, compression_from_name(compression), "compression", is.integer,
parquet___ArrowWriterProperties___Builder__default_compression,
set_compression = function(table, compression) {
compression <- compression_from_name(compression)
assert_that(is.integer(compression))
private$.set(table, compression,
parquet___ArrowWriterProperties___Builder__set_compressions
)
},

set_compression_level = function(table, compression_level){
private$.set(table, compression_level, "compression_level", is_integerish,
parquet___ArrowWriterProperties___Builder__default_compression_level,
assert_that(is_integerish(compression_level))
private$.set(table, compression_level,
parquet___ArrowWriterProperties___Builder__set_compression_levels
)
},

set_dictionary = function(table, use_dictionary) {
private$.set(table, use_dictionary, "use_dictionary", is.logical,
parquet___ArrowWriterProperties___Builder__default_use_dictionary,
assert_that(is.logical(use_dictionary))
private$.set(table, use_dictionary,
parquet___ArrowWriterProperties___Builder__set_use_dictionary
)
},

set_write_statistics = function(table, write_statistics) {
private$.set(table, write_statistics, "write_statistics", is.logical,
parquet___ArrowWriterProperties___Builder__default_write_statistics,
assert_that(is.logical(write_statistics))
private$.set(table, write_statistics,
parquet___ArrowWriterProperties___Builder__set_write_statistics
)
},

set_data_page_size = function(data_page_size) {
parquet___ArrowWriterProperties___Builder__data_page_size(self, data_page_size)
}
),

private = list(
.set = function(table, value, name, is, default, multiple) {
msg <- paste0("unsupported ", name, "= specification")
assert_that(is(value), msg = msg)
.set = function(table, value, FUN) {
msg <- paste0("unsupported ", substitute(value), "= specification")
column_names <- names(table)
if (is.null(given_names <- names(value))) {
if (length(value) == 1L) {
default(self, value)
} else if (length(value) == length(column_names)) {
multiple(self, column_names, value)
given_names <- names(value)
if (is.null(given_names)) {
if (length(value) %in% c(1L, length(column_names))) {
# If there's a single, unnamed value, FUN will set it globally
# If there are values for all columns, send them along with the names
FUN(self, column_names, value)
} else {
abort(msg)
}
} else if(all(given_names %in% column_names)) {
multiple(self, given_names, value)
} else if (all(given_names %in% column_names)) {
# Use the given names
FUN(self, given_names, value)
} else {
abort(msg)
}
}
)

)

ParquetWriterProperties$create <- function(table, version = NULL, compression = NULL, compression_level = NULL, use_dictionary = NULL, write_statistics = NULL, data_page_size = NULL) {
if (is.null(version) && is.null(compression) && is.null(compression_level) && is.null(use_dictionary) && is.null(write_statistics) && is.null(data_page_size)) {
shared_ptr(ParquetWriterProperties, parquet___default_writer_properties())
} else {
builder <- shared_ptr(ParquetWriterPropertiesBuilder, parquet___WriterProperties___Builder__create())
if (!is.null(version)) {
builder$set_version(version)
}
if (!is.null(compression)) {
builder$set_compression(table, compression = compression)
}
if (!is.null(compression_level)) {
builder$set_compression_level(table, compression_level = compression_level)
}
if (!is.null(use_dictionary)) {
builder$set_dictionary(table, use_dictionary)
}
if (!is.null(write_statistics)) {
builder$set_write_statistics(table, write_statistics)
}
if (!is.null(data_page_size)) {
builder$set_data_page_size(data_page_size)
}
shared_ptr(ParquetWriterProperties, parquet___WriterProperties___Builder__build(builder))
ParquetWriterProperties$create <- function(table,
version = NULL,
compression = NULL,
compression_level = NULL,
use_dictionary = NULL,
write_statistics = NULL,
data_page_size = NULL) {
builder <- shared_ptr(
ParquetWriterPropertiesBuilder,
parquet___WriterProperties___Builder__create()
)
if (!is.null(version)) {
builder$set_version(version)
}
if (!is.null(compression)) {
builder$set_compression(table, compression = compression)
}
if (!is.null(compression_level)) {
builder$set_compression_level(table, compression_level = compression_level)
}
if (!is.null(use_dictionary)) {
builder$set_dictionary(table, use_dictionary)
}
if (!is.null(write_statistics)) {
builder$set_write_statistics(table, write_statistics)
}
if (!is.null(data_page_size)) {
builder$set_data_page_size(data_page_size)
}
shared_ptr(ParquetWriterProperties, parquet___WriterProperties___Builder__build(builder))
}

#' @title ParquetFileWriter class
Expand All @@ -391,18 +368,13 @@ ParquetFileWriter <- R6Class("ParquetFileWriter", inherit = ArrowObject,
WriteTable = function(table, chunk_size) {
parquet___arrow___FileWriter__WriteTable(self, table, chunk_size)
},
Close = function() {
parquet___arrow___FileWriter__Close(self)
}
Close = function() parquet___arrow___FileWriter__Close(self)
)

)
ParquetFileWriter$create <- function(
schema,
sink,
properties = ParquetWriterProperties$create(),
arrow_properties = ParquetArrowWriterProperties$create()
) {
ParquetFileWriter$create <- function(schema,
sink,
properties = ParquetWriterProperties$create(),
arrow_properties = ParquetArrowWriterProperties$create()) {
shared_ptr(
ParquetFileWriter,
parquet___arrow___ParquetFileWriter__Open(schema, sink, properties, arrow_properties)
Expand Down

0 comments on commit 36b5a96

Please sign in to comment.