Skip to content

Commit

Permalink
GH-37799: [C++] Compute: CommonTemporal support time32 and time64 cas…
Browse files Browse the repository at this point in the history
…ting (#37949)

### Rationale for this change

The original problem in mentioned in #37799

1. `SECOND` in Parquet would always cast to `MILLS`
2. `eq` not support `eq(time32[s], time32[ms)`

This patch is as advice in #37799 (comment) . We tent to add time32 and time64 in `CommonTemporal`.

### What changes are included in this PR?

Support time32 and time64 with different time unit in `arrow::compute::internal::CommonTemporal`.

### Are these changes tested?

Yes

### Are there any user-facing changes?

bugfix

* Closes: #37799

Authored-by: mwish <maplewish117@gmail.com>
Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
  • Loading branch information
mapleFU committed Oct 2, 2023
1 parent b0eb021 commit 22df70a
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 9 deletions.
49 changes: 40 additions & 9 deletions cpp/src/arrow/compute/kernels/codegen_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ TypeHolder CommonTemporal(const TypeHolder* begin, size_t count) {
bool saw_date32 = false;
bool saw_date64 = false;
bool saw_duration = false;
bool saw_time_since_midnight = false;
const TypeHolder* end = begin + count;
for (auto it = begin; it != end; it++) {
auto id = it->type->id();
Expand All @@ -271,6 +272,18 @@ TypeHolder CommonTemporal(const TypeHolder* begin, size_t count) {
finest_unit = std::max(finest_unit, ty.unit());
continue;
}
case Type::TIME32: {
const auto& type = checked_cast<const Time32Type&>(*it->type);
finest_unit = std::max(finest_unit, type.unit());
saw_time_since_midnight = true;
continue;
}
case Type::TIME64: {
const auto& type = checked_cast<const Time64Type&>(*it->type);
finest_unit = std::max(finest_unit, type.unit());
saw_time_since_midnight = true;
continue;
}
case Type::DURATION: {
const auto& ty = checked_cast<const DurationType&>(*it->type);
finest_unit = std::max(finest_unit, ty.unit());
Expand All @@ -282,15 +295,33 @@ TypeHolder CommonTemporal(const TypeHolder* begin, size_t count) {
}
}

if (timezone) {
// At least one timestamp seen
return timestamp(finest_unit, *timezone);
} else if (saw_date64) {
return date64();
} else if (saw_date32) {
return date32();
} else if (saw_duration) {
return duration(finest_unit);
bool saw_timestamp_or_date = timezone || saw_date64 || saw_date32 || saw_duration;

if (saw_time_since_midnight && saw_timestamp_or_date) {
// Cannot find common type
return TypeHolder(nullptr);
}
if (saw_timestamp_or_date) {
if (timezone) {
// At least one timestamp seen
return timestamp(finest_unit, *timezone);
} else if (saw_date64) {
return date64();
} else if (saw_date32) {
return date32();
} else if (saw_duration) {
return duration(finest_unit);
}
}
if (saw_time_since_midnight) {
switch (finest_unit) {
case TimeUnit::SECOND:
case TimeUnit::MILLI:
return time32(finest_unit);
case TimeUnit::MICRO:
case TimeUnit::NANO:
return time64(finest_unit);
}
}
return TypeHolder(nullptr);
}
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/arrow/compute/kernels/codegen_internal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,18 @@ TEST(TestDispatchBest, CommonTemporal) {
args = {timestamp(TimeUnit::SECOND, "America/Phoenix"),
timestamp(TimeUnit::SECOND, "UTC")};
ASSERT_EQ(CommonTemporal(args.data(), args.size()), nullptr);

args = {time32(TimeUnit::SECOND), time32(TimeUnit::MILLI)};
AssertTypeEqual(*time32(TimeUnit::MILLI), *CommonTemporal(args.data(), args.size()));

args = {time32(TimeUnit::SECOND), time64(TimeUnit::NANO)};
AssertTypeEqual(*time64(TimeUnit::NANO), *CommonTemporal(args.data(), args.size()));

args = {date32(), time32(TimeUnit::SECOND)};
ASSERT_EQ(CommonTemporal(args.data(), args.size()), nullptr);

args = {timestamp(TimeUnit::SECOND), time32(TimeUnit::SECOND)};
ASSERT_EQ(CommonTemporal(args.data(), args.size()), nullptr);
}

TEST(TestDispatchBest, CommonTemporalResolution) {
Expand Down
25 changes: 25 additions & 0 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,31 @@ TEST_P(TestParquetFileFormatScan, PredicatePushdownRowGroupFragmentsUsingDuratio
CountRowGroupsInFragment(fragment, {0}, expr);
}

TEST_P(TestParquetFileFormatScan,
PredicatePushdownRowGroupFragmentsUsingTimestampColumn) {
// GH-37799: Parquet arrow will change TimeUnit::SECOND to TimeUnit::MILLI
// because parquet LogicalType doesn't support SECOND.
for (auto time_unit : {TimeUnit::MILLI, TimeUnit::SECOND}) {
auto table = TableFromJSON(schema({field("t", time32(time_unit))}),
{
R"([{"t": 1}])",
R"([{"t": 2}, {"t": 3}])",
});
TableBatchReader table_reader(*table);
ARROW_SCOPED_TRACE("time_unit=", time_unit);
ASSERT_OK_AND_ASSIGN(
auto source,
ParquetFormatHelper::Write(
&table_reader, ArrowWriterProperties::Builder().store_schema()->build())
.As<FileSource>());
SetSchema({field("t", time32(time_unit))});
ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(source));

auto expr = equal(field_ref("t"), literal(::arrow::Time32Scalar(1, time_unit)));
CountRowGroupsInFragment(fragment, {0}, expr);
}
}

// Tests projection with nested/indexed FieldRefs.
// https://github.com/apache/arrow/issues/35579
TEST_P(TestParquetFileFormatScan, ProjectWithNonNamedFieldRefs) {
Expand Down

0 comments on commit 22df70a

Please sign in to comment.