Skip to content

Commit

Permalink
[GLUTEN-1433][VL] feat: support timestamp - phase 1 (#1435)
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed May 16, 2023
1 parent 2b4cfcc commit 7385bb3
Show file tree
Hide file tree
Showing 14 changed files with 882 additions and 741 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ object VeloxBackendSettings extends BackendSettings {
format match {
case ParquetReadFormat => validateTypes && validateFilePath
case DwrfReadFormat => true
case OrcReadFormat => true
case OrcReadFormat => fields.map(_.dataType).collect {
case _: TimestampType =>
}.isEmpty
case _ => false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,19 +341,11 @@ class VeloxDataTypeValidationSuite extends WholeStageTransformerSuite {
}

test("Timestamp type") {
// Validation: BatchScan Project Aggregate Expand Sort Limit
runQueryAndCompare("select int, timestamp from type1 " +
" group by grouping sets(int, timestamp) sort by timestamp, int limit 1") { _ => }

// Validation: BroadHashJoin, Filter, Project
super.sparkConf.set("spark.sql.autoBroadcastJoinThreshold", "10M")
runQueryAndCompare("select type1.timestamp from type1," +
" type2 where type1.timestamp = type2.timestamp") { _ => }

// Validation: ShuffledHashJoin, Filter, Project
super.sparkConf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
runQueryAndCompare("select type1.timestamp from type1," +
" type2 where type1.timestamp = type2.timestamp") { _ => }
runQueryAndCompare("select timestamp from type1 limit 100") { df => {
val executedPlan = getExecutedPlan(df)
assert(executedPlan.exists(plan =>
plan.find(child => child.isInstanceOf[BatchScanExecTransformer]).isDefined))
}}
}

test("Struct type") {
Expand Down
4 changes: 2 additions & 2 deletions cpp/velox/compute/VeloxColumnarToRowConverter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,10 @@ arrow::Status VeloxColumnarToRowConverter::write() {
setNullAt(bufferAddress_, offsets_[rowIdx], field_offset, col_idx);
}
} else {
auto longDecimal = vec->asFlatVector<velox::UnscaledLongDecimal>()->rawValues();
if (flag) {
setNullAt(bufferAddress_, offsets_[rowIdx], field_offset, col_idx);
} else {
auto longDecimal = vec->asFlatVector<velox::UnscaledLongDecimal>()->rawValues();
int32_t size;
velox::int128_t veloxInt128 = longDecimal[rowIdx].unscaledValue();

Expand All @@ -213,7 +213,7 @@ arrow::Status VeloxColumnarToRowConverter::write() {
}
break;
}
case arrow::Time64Type::type_id: {
case arrow::TimestampType::type_id: {
SERIALIZE_COLUMN(TimestampType);
break;
}
Expand Down
38 changes: 36 additions & 2 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,40 @@ const std::string kTotalScanTime = "totalScanTime";
// others
const std::string kHiveDefaultPartition = "__HIVE_DEFAULT_PARTITION__";
std::atomic<int32_t> taskSerial;

// From_hex from dlib.
inline unsigned char fromHex(unsigned char ch) {
if (ch <= '9' && ch >= '0')
ch -= '0';
else if (ch <= 'f' && ch >= 'a')
ch -= 'a' - 10;
else if (ch <= 'F' && ch >= 'A')
ch -= 'A' - 10;
else
ch = 0;
return ch;
}

// URL decoder from dlib.
const std::string urlDecode(const std::string& str) {
std::string result;
std::string::size_type i;
for (i = 0; i < str.size(); ++i) {
if (str[i] == '+') {
result += ' ';
} else if (str[i] == '%' && str.size() > i + 2) {
const unsigned char ch1 = fromHex(str[i + 1]);
const unsigned char ch2 = fromHex(str[i + 2]);
const unsigned char ch = (ch1 << 4) | ch2;
result += ch;
i += 2;
} else {
result += str[i];
}
}
return result;
}

} // namespace

std::shared_ptr<velox::core::QueryCtx> WholeStageResultIterator::createNewVeloxQueryCtx() {
Expand Down Expand Up @@ -354,8 +388,8 @@ WholeStageResultIteratorFirstStage::extractPartitionColumnAndValue(const std::st
if (partitionValue == kHiveDefaultPartition) {
partitionKeys[partitionColumn] = std::nullopt;
} else {
// Set to the map of partition keys.
partitionKeys[partitionColumn] = partitionValue;
// Set to the map of partition keys. Timestamp could be URL encoded.
partitionKeys[partitionColumn] = urlDecode(partitionValue);
}
// For processing the remaining keys.
str = latterPart.substr(pos + 1);
Expand Down
Loading

0 comments on commit 7385bb3

Please sign in to comment.