Skip to content

Commit

Permalink
ARROW-7772: [R][C++][Dataset] Unable to filter on date32 object with …
Browse files Browse the repository at this point in the history
…date64 scalar

I fixed the issue in the R bindings that triggered @stephhazlitt's report. Then I added another test that still causes the crash.

The crash message points at this line: https://github.com/apache/arrow/blob/master/cpp/src/arrow/scalar.cc#L333

```
/Users/enpiar/Documents/ursa/arrow/cpp/src/arrow/dataset/filter.cc:929:  Check failed: _s.ok() Operation failed: maybe_value.status()
Bad status: NotImplemented: casting scalars of type timestamp[s] to type date32[day]
In /Users/enpiar/Documents/ursa/arrow/cpp/src/arrow/scalar.cc, line 333, code: VisitTypeInline(*to, &unpack_to_type)
```

@bkietz over to you to catch that crash and also hopefully to support this cast.

Closes #6354 from nealrichardson/date-scalar and squashes the following commits:

b620b06 <Neal Richardson> Add news entry
e1a50ec <Neal Richardson> Add some more R tests
a2d3eb9 <Benjamin Kietzman> add support for casting dates, times, durations, timestamps
11451e8 <Neal Richardson> Failing test that reproduces the C++ issue
b73adb0 <Neal Richardson> Test and fix for Date scalar filtering

Lead-authored-by: Benjamin Kietzman <bengilgit@gmail.com>
Co-authored-by: Neal Richardson <neal.p.richardson@gmail.com>
Signed-off-by: Neal Richardson <neal.p.richardson@gmail.com>
  • Loading branch information
bkietz and nealrichardson committed Feb 7, 2020
1 parent 83afab5 commit cb686b3
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 20 deletions.
29 changes: 14 additions & 15 deletions cpp/src/arrow/dataset/filter.cc
Expand Up @@ -917,17 +917,16 @@ struct InsertImplicitCastsImpl {
return std::move(out);
}

void Cast(std::shared_ptr<DataType> type, std::shared_ptr<Expression>* expr) {
if ((*expr)->type() != ExpressionType::SCALAR) {
*expr = (*expr)->CastTo(type).Copy();
return;
Result<std::shared_ptr<Expression>> Cast(std::shared_ptr<DataType> type,
const Expression& expr) {
if (expr.type() != ExpressionType::SCALAR) {
return expr.CastTo(type).Copy();
}

// cast the scalar directly
const auto& value = checked_cast<const ScalarExpression&>(**expr).value();
auto maybe_value = value->CastTo(std::move(type));
DCHECK_OK(maybe_value.status());
*expr = scalar(*maybe_value);
const auto& value = checked_cast<const ScalarExpression&>(expr).value();
ARROW_ASSIGN_OR_RAISE(auto cast_value, value->CastTo(std::move(type)));
return scalar(cast_value);
}

Result<std::shared_ptr<Expression>> operator()(const InExpression& expr) {
Expand All @@ -948,7 +947,7 @@ struct InsertImplicitCastsImpl {
ARROW_ASSIGN_OR_RAISE(auto op, InsertCastsAndValidate(*expr.operand()));

if (op.type->id() != Type::BOOL) {
Cast(boolean(), &op.expr);
ARROW_ASSIGN_OR_RAISE(op.expr, Cast(boolean(), *op.expr));
}
return not_(std::move(op.expr));
}
Expand All @@ -958,10 +957,10 @@ struct InsertImplicitCastsImpl {
ARROW_ASSIGN_OR_RAISE(auto rhs, InsertCastsAndValidate(*expr.right_operand()));

if (lhs.type->id() != Type::BOOL) {
Cast(boolean(), &lhs.expr);
ARROW_ASSIGN_OR_RAISE(lhs.expr, Cast(boolean(), *lhs.expr));
}
if (rhs.type->id() != Type::BOOL) {
Cast(boolean(), &rhs.expr);
ARROW_ASSIGN_OR_RAISE(rhs.expr, Cast(boolean(), *rhs.expr));
}
return and_(std::move(lhs.expr), std::move(rhs.expr));
}
Expand All @@ -971,10 +970,10 @@ struct InsertImplicitCastsImpl {
ARROW_ASSIGN_OR_RAISE(auto rhs, InsertCastsAndValidate(*expr.right_operand()));

if (lhs.type->id() != Type::BOOL) {
Cast(boolean(), &lhs.expr);
ARROW_ASSIGN_OR_RAISE(lhs.expr, Cast(boolean(), *lhs.expr));
}
if (rhs.type->id() != Type::BOOL) {
Cast(boolean(), &rhs.expr);
ARROW_ASSIGN_OR_RAISE(rhs.expr, Cast(boolean(), *rhs.expr));
}
return or_(std::move(lhs.expr), std::move(rhs.expr));
}
Expand All @@ -988,9 +987,9 @@ struct InsertImplicitCastsImpl {
}

if (lhs.expr->type() == ExpressionType::SCALAR) {
Cast(rhs.type, &lhs.expr);
ARROW_ASSIGN_OR_RAISE(lhs.expr, Cast(rhs.type, *lhs.expr));
} else {
Cast(lhs.type, &rhs.expr);
ARROW_ASSIGN_OR_RAISE(rhs.expr, Cast(lhs.type, *rhs.expr));
}
return std::make_shared<ComparisonExpression>(expr.op(), std::move(lhs.expr),
std::move(rhs.expr));
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/dataset/filter_test.cc
Expand Up @@ -352,6 +352,8 @@ TEST_F(ExpressionsTest, ImplicitCast) {
ASSERT_OK_AND_ASSIGN(filter, InsertImplicitCasts("a"_.In(set_double), *schema_));
auto set_int32 = ArrayFromJSON(int32(), R"([1, 2, 3])");
ASSERT_EQ(E{filter}, E{"a"_.In(set_int32)});

ASSERT_RAISES(Invalid, InsertImplicitCasts("nope"_ == 0.0, *schema_));
}

TEST_F(FilterTest, ImplicitCast) {
Expand Down
59 changes: 59 additions & 0 deletions cpp/src/arrow/scalar.cc
Expand Up @@ -219,6 +219,65 @@ Status CastImpl(const TimestampScalar& from, TimestampScalar* to) {
return util::ConvertTimestampValue(from.type, to->type, from.value).Value(&to->value);
}

template <typename TypeWithTimeUnit>
std::shared_ptr<DataType> AsTimestampType(const std::shared_ptr<DataType>& type) {
return timestamp(checked_cast<const TypeWithTimeUnit&>(*type).unit());
}

// duration to duration
Status CastImpl(const DurationScalar& from, DurationScalar* to) {
return util::ConvertTimestampValue(AsTimestampType<DurationType>(from.type),
AsTimestampType<DurationType>(to->type), from.value)
.Value(&to->value);
}

// time to time
template <typename F, typename ToScalar, typename T = typename ToScalar::TypeClass>
enable_if_time<T, Status> CastImpl(const TimeScalar<F>& from, ToScalar* to) {
return util::ConvertTimestampValue(AsTimestampType<F>(from.type),
AsTimestampType<T>(to->type), from.value)
.Value(&to->value);
}

constexpr int64_t kMillisecondsInDay = 86400000;

// date to date
Status CastImpl(const Date32Scalar& from, Date64Scalar* to) {
to->value = from.value * kMillisecondsInDay;
return Status::OK();
}
Status CastImpl(const Date64Scalar& from, Date32Scalar* to) {
to->value = static_cast<int32_t>(from.value / kMillisecondsInDay);
return Status::OK();
}

// timestamp to date
Status CastImpl(const TimestampScalar& from, Date64Scalar* to) {
ARROW_ASSIGN_OR_RAISE(
auto millis,
util::ConvertTimestampValue(from.type, timestamp(TimeUnit::MILLI), from.value));
to->value = millis - millis % kMillisecondsInDay;
return Status::OK();
}
Status CastImpl(const TimestampScalar& from, Date32Scalar* to) {
ARROW_ASSIGN_OR_RAISE(
auto millis,
util::ConvertTimestampValue(from.type, timestamp(TimeUnit::MILLI), from.value));
to->value = static_cast<int32_t>(millis / kMillisecondsInDay);
return Status::OK();
}

// date to timestamp
template <typename D>
Status CastImpl(const DateScalar<D>& from, TimestampScalar* to) {
int64_t millis = from.value;
if (std::is_same<D, Date32Type>::value) {
millis *= kMillisecondsInDay;
}
return util::ConvertTimestampValue(timestamp(TimeUnit::MILLI), to->type, millis)
.Value(&to->value);
}

Status CastImpl(const TimestampScalar& from, StringScalar* to) {
to->value = Buffer::FromString(std::to_string(from.value));
return Status::OK();
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/arrow/scalar_test.cc
Expand Up @@ -305,6 +305,19 @@ TEST(TestTimestampScalars, Cast) {

EXPECT_EQ(convert(TimeUnit::NANO, TimeUnit::MICRO, 1234), 1);
EXPECT_EQ(convert(TimeUnit::MICRO, TimeUnit::MILLI, 4567), 4);

ASSERT_OK_AND_ASSIGN(auto str,
TimestampScalar(1024, timestamp(TimeUnit::MILLI)).CastTo(utf8()));
EXPECT_EQ(*str, StringScalar("1024"));
ASSERT_OK_AND_ASSIGN(auto i64,
TimestampScalar(1024, timestamp(TimeUnit::MILLI)).CastTo(int64()));
EXPECT_EQ(*i64, Int64Scalar(1024));

constexpr int64_t kMillisecondsInDay = 86400000;
ASSERT_OK_AND_ASSIGN(
auto d64, TimestampScalar(1024 * kMillisecondsInDay + 3, timestamp(TimeUnit::MILLI))
.CastTo(date64()));
EXPECT_EQ(*d64, Date64Scalar(1024 * kMillisecondsInDay));
}

TEST(TestDurationScalars, Basics) {
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/util/time.h
Expand Up @@ -45,9 +45,9 @@ static std::pair<DivideOrMultiply, int64_t> kTimestampConversionTable[4][4] = {
//
// This function takes care of properly transforming from one unit to another.
//
// \param[in] in, the input type. Must be TimestampType.
// \param[in] out, the output type. Must be TimestampType.
// \param[in] value, the input value.
// \param[in] in the input type. Must be TimestampType.
// \param[in] out the output type. Must be TimestampType.
// \param[in] value the input value.
//
// \return The converted value, or an error.
ARROW_EXPORT Result<int64_t> ConvertTimestampValue(const std::shared_ptr<DataType>& in,
Expand Down
2 changes: 2 additions & 0 deletions r/NEWS.md
Expand Up @@ -19,6 +19,8 @@

# arrow 0.16.0.9000

* Dataset filtering is now correctly supported for all Arrow date/time/timestamp column types.

# arrow 0.16.0

## Multi-file datasets
Expand Down
3 changes: 1 addition & 2 deletions r/src/expression.cpp
Expand Up @@ -108,9 +108,8 @@ std::shared_ptr<ds::ScalarExpression> dataset___expr__scalar(SEXP x) {
return ds::scalar(Rf_asLogical(x));
case REALSXP:
if (Rf_inherits(x, "Date")) {
constexpr static int64_t kMillisecondsPerDay = 86400000;
return std::make_shared<ds::ScalarExpression>(
std::make_shared<arrow::Date64Scalar>(REAL(x)[0] * kMillisecondsPerDay));
std::make_shared<arrow::Date32Scalar>(REAL(x)[0]));
} else if (Rf_inherits(x, "POSIXct")) {
return std::make_shared<ds::ScalarExpression>(
std::make_shared<arrow::TimestampScalar>(
Expand Down
53 changes: 53 additions & 0 deletions r/tests/testthat/test-dataset.R
Expand Up @@ -212,6 +212,59 @@ test_that("filter() on timestamp columns", {
collect(),
df1[5:10, c("ts")],
)

# Now with Date
expect_equivalent(
ds %>%
filter(ts >= as.Date("2015-05-04")) %>%
filter(part == 1) %>%
select(ts) %>%
collect(),
df1[5:10, c("ts")],
)

# Now with bare string date
expect_equivalent(
ds %>%
filter(ts >= "2015-05-04") %>%
filter(part == 1) %>%
select(ts) %>%
collect(),
df1[5:10, c("ts")],
)
})

test_that("filter() on date32 columns", {
tmp <- tempfile()
dir.create(tmp)
df <- data.frame(date = as.Date(c("2020-02-02", "2020-02-03")))
write_parquet(df, file.path(tmp, "file.parquet"))

expect_equal(
open_dataset(tmp) %>%
filter(date > as.Date("2020-02-02")) %>%
collect() %>%
nrow(),
1L
)

# Also with timestamp scalar
expect_equal(
open_dataset(tmp) %>%
filter(date > lubridate::ymd_hms("2020-02-02 00:00:00")) %>%
collect() %>%
nrow(),
1L
)
})

test_that("filter scalar validation doesn't crash (ARROW-7772)", {
expect_error(
ds %>%
filter(int == "fff", part == 1) %>%
collect(),
"error parsing 'fff' as scalar of type int32"
)
})

test_that("collect() on Dataset works (if fits in memory)", {
Expand Down

0 comments on commit cb686b3

Please sign in to comment.