Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed May 11, 2023
1 parent 7d64301 commit f7561a9
Show file tree
Hide file tree
Showing 12 changed files with 206 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ object VeloxBackendSettings extends BackendSettings {
format match {
case ParquetReadFormat => validateTypes && validateFilePath
case DwrfReadFormat => true
case OrcReadFormat => true
case OrcReadFormat => fields.map(_.dataType).collect {
case _: TimestampType =>
}.isEmpty
case _ => false
}
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/velox/compute/VeloxColumnarToRowConverter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ arrow::Status VeloxColumnarToRowConverter::Write() {
SetNullAt(buffer_address_, offsets_[row_idx], field_offset, col_idx);
}
} else {
auto longDecimal = vec->asFlatVector<velox::UnscaledLongDecimal>()->rawValues();
if (flag) {
SetNullAt(buffer_address_, offsets_[row_idx], field_offset, col_idx);
} else {
auto longDecimal = vec->asFlatVector<velox::UnscaledLongDecimal>()->rawValues();
int32_t size;
velox::int128_t veloxInt128 = longDecimal[row_idx].unscaledValue();

Expand All @@ -212,7 +212,7 @@ arrow::Status VeloxColumnarToRowConverter::Write() {
}
break;
}
case arrow::Time64Type::type_id: {
case arrow::TimestampType::type_id: {
SERIALIZE_COLUMN(TimestampType);
break;
}
Expand Down
38 changes: 36 additions & 2 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,40 @@ const std::string kTotalScanTime = "totalScanTime";
// others
const std::string kHiveDefaultPartition = "__HIVE_DEFAULT_PARTITION__";
std::atomic<int32_t> taskSerial;

// From_hex from dlib.
inline unsigned char fromHex(unsigned char ch) {
if (ch <= '9' && ch >= '0')
ch -= '0';
else if (ch <= 'f' && ch >= 'a')
ch -= 'a' - 10;
else if (ch <= 'F' && ch >= 'A')
ch -= 'A' - 10;
else
ch = 0;
return ch;
}

// URL decoder from dlib.
const std::string urlDecode(const std::string& str) {
std::string result;
std::string::size_type i;
for (i = 0; i < str.size(); ++i) {
if (str[i] == '+') {
result += ' ';
} else if (str[i] == '%' && str.size() > i + 2) {
const unsigned char ch1 = fromHex(str[i + 1]);
const unsigned char ch2 = fromHex(str[i + 2]);
const unsigned char ch = (ch1 << 4) | ch2;
result += ch;
i += 2;
} else {
result += str[i];
}
}
return result;
}

} // namespace

std::shared_ptr<velox::core::QueryCtx> WholeStageResultIterator::createNewVeloxQueryCtx() {
Expand Down Expand Up @@ -353,8 +387,8 @@ WholeStageResultIteratorFirstStage::extractPartitionColumnAndValue(const std::st
if (partitionValue == kHiveDefaultPartition) {
partitionKeys[partitionColumn] = std::nullopt;
} else {
// Set to the map of partition keys.
partitionKeys[partitionColumn] = partitionValue;
// Set to the map of partition keys. Timestamp could be URL encoded.
partitionKeys[partitionColumn] = urlDecode(partitionValue);
}
// For processing the remaining keys.
str = latterPart.substr(pos + 1);
Expand Down
101 changes: 64 additions & 37 deletions cpp/velox/shuffle/VeloxShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,10 @@ arrow::Status VeloxShuffleWriter::Split(ColumnarBatch* cb) {

auto result = arrow::ImportRecordBatch(&arrowArray, schema_);
RETURN_NOT_OK(result);
std::shared_ptr<arrow::RecordBatch> batchPtr = result.ValueOrDie();

// 2. call CacheRecordBatch with RecordBatch
RETURN_NOT_OK(CacheRecordBatch(0, *(*result)));
RETURN_NOT_OK(CacheRecordBatch(0, *batchPtr));
} else {
auto& rv = *veloxColumnBatch->getFlattenedRowVector();
RETURN_NOT_OK(InitFromRowVector(rv));
Expand Down Expand Up @@ -346,25 +347,49 @@ arrow::Status VeloxShuffleWriter::SplitFixedWidthValueBuffer(const velox::RowVec
case 32:
RETURN_NOT_OK(SplitFixedType<uint32_t>(src_addr, dst_addrs));
break;
case 64:
case 64: {
if (column->type()->kind() == velox::TypeKind::TIMESTAMP) {
std::transform(
dst_addrs.begin(),
dst_addrs.end(),
partition_buffer_idx_base_.begin(),
partition_buffer_idx_offset_.begin(),
[](uint8_t* x, uint32_t y) { return x + y * sizeof(int64_t); });

const auto& tsVec = column->asFlatVector<velox::Timestamp>();
for (uint32_t pid = 0; pid < num_partitions_; ++pid) {
auto dst_pid_base = reinterpret_cast<int64_t*>(partition_buffer_idx_offset_[pid]);
auto pos = partition_2_row_offset_[pid];
auto end = partition_2_row_offset_[pid + 1];
for (; pos < end; ++pos) {
auto row_id = row_offset_2_row_id_[pos];
if (tsVec->mayHaveNulls() && tsVec->isNullAt(row_id)) {
*dst_pid_base++ = 0;
} else {
*dst_pid_base++ = tsVec->valueAt(row_id).toMicros(); // copy
}
}
}
break;
} else {
#ifdef PROCESSAVX
std::transform(
dst_addrs.begin(),
dst_addrs.end(),
partition_buffer_idx_base_.begin(),
partition_buffer_idx_offset_.begin(),
[](uint8_t* x, row_offset_type y) { return x + y * sizeof(uint64_t); });
for (auto pid = 0; pid < num_partitions_; pid++) {
auto dst_pid_base = reinterpret_cast<uint64_t*>(partition_buffer_idx_offset_[pid]); /*32k*/
auto r = partition_2_row_offset_[pid]; /*8k*/
auto size = partition_2_row_offset_[pid + 1];
std::transform(
dst_addrs.begin(),
dst_addrs.end(),
partition_buffer_idx_base_.begin(),
partition_buffer_idx_offset_.begin(),
[](uint8_t* x, row_offset_type y) { return x + y * sizeof(uint64_t); });
for (auto pid = 0; pid < num_partitions_; pid++) {
auto dst_pid_base = reinterpret_cast<uint64_t*>(partition_buffer_idx_offset_[pid]); /*32k*/
auto r = partition_2_row_offset_[pid]; /*8k*/
auto size = partition_2_row_offset_[pid + 1];
#if 1
for (r; r < size && (((uint64_t)dst_pid_base & 0x1f) > 0); r++) {
auto src_offset = row_offset_2_row_id_[r]; /*16k*/
*dst_pid_base = reinterpret_cast<uint64_t*>(src_addr)[src_offset]; /*64k*/
_mm_prefetch(&(src_addr)[src_offset * sizeof(uint64_t) + 64], _MM_HINT_T2);
dst_pid_base += 1;
}
for (r; r < size && (((uint64_t)dst_pid_base & 0x1f) > 0); r++) {
auto src_offset = row_offset_2_row_id_[r]; /*16k*/
*dst_pid_base = reinterpret_cast<uint64_t*>(src_addr)[src_offset]; /*64k*/
_mm_prefetch(&(src_addr)[src_offset * sizeof(uint64_t) + 64], _MM_HINT_T2);
dst_pid_base += 1;
}
#if 0
for (r; r+4<size; r+=4)
{
Expand All @@ -383,31 +408,33 @@ arrow::Status VeloxShuffleWriter::SplitFixedWidthValueBuffer(const velox::RowVec
dst_pid_base+=4;
}
#endif
for (r; r + 2 < size; r += 2) {
__m128i src_offset_2x = _mm_cvtsi32_si128(*((int32_t*)(row_offset_2_row_id_.data() + r)));
src_offset_2x = _mm_shufflelo_epi16(src_offset_2x, 0x98);
for (r; r + 2 < size; r += 2) {
__m128i src_offset_2x = _mm_cvtsi32_si128(*((int32_t*)(row_offset_2_row_id_.data() + r)));
src_offset_2x = _mm_shufflelo_epi16(src_offset_2x, 0x98);

__m128i src_2x = _mm_i32gather_epi64((const long long int*)src_addr, src_offset_2x, 8);
_mm_store_si128((__m128i*)dst_pid_base, src_2x);
//_mm_stream_si128((__m128i*)dst_pid_base,src_2x);
__m128i src_2x = _mm_i32gather_epi64((const long long int*)src_addr, src_offset_2x, 8);
_mm_store_si128((__m128i*)dst_pid_base, src_2x);
//_mm_stream_si128((__m128i*)dst_pid_base,src_2x);

_mm_prefetch(&(src_addr)[(uint32_t)row_offset_2_row_id_[r] * sizeof(uint64_t) + 64], _MM_HINT_T2);
_mm_prefetch(&(src_addr)[(uint32_t)row_offset_2_row_id_[r + 1] * sizeof(uint64_t) + 64], _MM_HINT_T2);
dst_pid_base += 2;
}
_mm_prefetch(&(src_addr)[(uint32_t)row_offset_2_row_id_[r] * sizeof(uint64_t) + 64], _MM_HINT_T2);
_mm_prefetch(&(src_addr)[(uint32_t)row_offset_2_row_id_[r + 1] * sizeof(uint64_t) + 64], _MM_HINT_T2);
dst_pid_base += 2;
}
#endif
for (r; r < size; r++) {
auto src_offset = row_offset_2_row_id_[r]; /*16k*/
*dst_pid_base = reinterpret_cast<const uint64_t*>(src_addr)[src_offset]; /*64k*/
_mm_prefetch(&(src_addr)[src_offset * sizeof(uint64_t) + 64], _MM_HINT_T2);
dst_pid_base += 1;
for (r; r < size; r++) {
auto src_offset = row_offset_2_row_id_[r]; /*16k*/
*dst_pid_base = reinterpret_cast<const uint64_t*>(src_addr)[src_offset]; /*64k*/
_mm_prefetch(&(src_addr)[src_offset * sizeof(uint64_t) + 64], _MM_HINT_T2);
dst_pid_base += 1;
}
}
}
break;
break;
#else
RETURN_NOT_OK(SplitFixedType<uint64_t>(src_addr, dst_addrs));
RETURN_NOT_OK(SplitFixedType<uint64_t>(src_addr, dst_addrs));
#endif
break;
break;
}
}
#if defined(__x86_64__)
case 128: // arrow::Decimal128Type::type_id
// too bad gcc generates movdqa even we use __m128i_u data type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ case class TransformPreOverrides(isAdaptiveContextOrTopParentExchange: Boolean)
val project = ProjectExec(
Seq(Alias(hashExpression, "hash_partition_key")()) ++ child.output, child)
AddTransformHintRule().apply(project)
replaceWithTransformerPlan(project)
TransformHints.getHint(project) match {
case TRANSFORM_SUPPORTED() => replaceWithTransformerPlan(project)
case TRANSFORM_UNSUPPORTED() => project
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import io.glutenproject.metrics.IMetrics
import io.glutenproject.substrait.plan.PlanNode
import io.glutenproject.substrait.rel.LocalFilesBuilder
import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat
import io.glutenproject.utils.GlutenImplicitClass.{coalesce, ArrowColumnarBatchRetainer}
import io.glutenproject.utils.GlutenImplicitClass.{ArrowColumnarBatchRetainer, coalesce}
import io.glutenproject.vectorized._
import org.apache.arrow.vector.types.pojo.Schema
import org.apache.spark.{InterruptibleIterator, Partition, SparkConf, SparkContext, TaskContext}
Expand All @@ -44,11 +44,12 @@ import org.apache.spark.sql.execution.joins.BuildSideRelation
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper
import org.apache.spark.sql.utils.SparkArrowUtil
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
import org.apache.spark.util.ExecutorManager
import org.apache.spark.util.memory.TaskMemoryResources

import java.net.URLDecoder
import java.nio.charset.StandardCharsets
import java.util
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
Expand All @@ -73,7 +74,7 @@ abstract class GlutenIteratorApi extends IteratorApi with Logging {
val lengths = new java.util.ArrayList[java.lang.Long]()
val fileFormat = wsCxt.substraitContext.getFileFormat.get(0)
f.files.foreach { file =>
paths.add(URLDecoder.decode(file.filePath))
paths.add(URLDecoder.decode(file.filePath, StandardCharsets.UTF_8.name()))
starts.add(new java.lang.Long(file.start))
lengths.add(new java.lang.Long(file.length))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ case class GlutenRowToArrowColumnarExec(child: SparkPlan)
case _: BinaryType =>
case _: DecimalType =>
case _: DateType =>
case _: TimestampType =>
case _ => return false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ object SparkArrowUtil {
case ArrowType.Binary.INSTANCE => BinaryType
case d: ArrowType.Decimal => DecimalType(d.getPrecision, d.getScale)
case date: ArrowType.Date if date.getUnit == DateUnit.DAY => DateType
case ts: ArrowType.Timestamp if ts.getUnit == TimeUnit.MICROSECOND => TimestampType
// TODO: Time unit is not handled.
case _: ArrowType.Timestamp => TimestampType
// case ArrowType.Null.INSTANCE => NullType
case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dt")
}

/** Maps field from Spark to Arrow. NOTE: timeZoneId required for TimestampType */
def toArrowField(
name: String, dt: DataType, nullable: Boolean, timeZoneId: String): Field = {
def toArrowField(name: String, dt: DataType, nullable: Boolean, timeZoneId: String): Field = {
dt match {
case ArrayType(elementType, containsNull) =>
val fieldType = new FieldType(nullable, ArrowType.List.INSTANCE, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -812,14 +812,14 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-35640: read binary as timestamp should throw schema incompatible error")
// Rewrite to align exception msg.
.exclude("SPARK-35640: int as long should throw schema incompatible error")
// Timestamp is read as INT96.
.exclude("read dictionary and plain encoded timestamp_millis written as INT64")
enableSuite[GlutenParquetV1PartitionDiscoverySuite]
.exclude("SPARK-7847: Dynamic partition directory path escaping and unescaping")
.exclude(
"SPARK-22109: Resolve type conflicts between strings and timestamps in partition column")
// Timezone is not supported yet.
.exclude("Resolve type conflicts - decimals, dates and timestamps in partition column")
enableSuite[GlutenParquetV2PartitionDiscoverySuite]
.exclude("SPARK-7847: Dynamic partition directory path escaping and unescaping")
.exclude(
"SPARK-22109: Resolve type conflicts between strings and timestamps in partition column")
// Timezone is not supported yet.
.exclude("Resolve type conflicts - decimals, dates and timestamps in partition column")
enableSuite[GlutenParquetProtobufCompatibilitySuite]
enableSuite[GlutenParquetV1QuerySuite]
// spark.sql.parquet.enableVectorizedReader=true not supported
Expand All @@ -828,13 +828,21 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("returning batch for wide table")
// decimal failed ut
.exclude("SPARK-34212 Parquet should read decimals correctly")
// Timestamp is read as INT96.
.exclude("SPARK-10634 timestamp written and read as INT64 - truncation")
.exclude("Migration from INT96 to TIMESTAMP_MICROS timestamp type")
.exclude("SPARK-10365 timestamp written and read as INT64 - TIMESTAMP_MICROS")
enableSuite[GlutenParquetV2QuerySuite]
// spark.sql.parquet.enableVectorizedReader=true not supported
.exclude("SPARK-16632: read Parquet int32 as ByteType and ShortType")
.exclude("Enabling/disabling ignoreCorruptFiles")
.exclude("returning batch for wide table")
// decimal failed ut
.exclude("SPARK-34212 Parquet should read decimals correctly")
// Timestamp is read as INT96.
.exclude("SPARK-10634 timestamp written and read as INT64 - truncation")
.exclude("Migration from INT96 to TIMESTAMP_MICROS timestamp type")
.exclude("SPARK-10365 timestamp written and read as INT64 - TIMESTAMP_MICROS")
// requires resource files from Vanilla spark jar
// enableSuite[GlutenParquetRebaseDatetimeV1Suite]
// enableSuite[GlutenParquetRebaseDatetimeV2Suite]
Expand Down Expand Up @@ -1030,5 +1038,7 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-16336,SPARK-27961 Suggest fixing FileNotFoundException")
enableSuite[GlutenSimpleShowCreateTableSuite]
enableSuite[GlutenStatisticsCollectionSuite]
// TODO: bug fix on TableScan.
.exclude("store and retrieve column stats in different time zones")
enableSuite[FallbackStrategiesSuite]
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,19 @@

package org.apache.spark.sql

import org.apache.spark.SparkConf

class GlutenFileSourceSQLInsertTestSuite extends FileSourceSQLInsertTestSuite
with GlutenSQLTestsTrait {
override def sparkConf: SparkConf = {
// Timezone is not supported yet.
super.sparkConf.set("spark.sql.session.timeZone", "UTC")
}
}

class GlutenDSV2SQLInsertTestSuite extends DSV2SQLInsertTestSuite {
override def sparkConf: SparkConf = {
// Timezone is not supported yet.
super.sparkConf.set("spark.sql.session.timeZone", "UTC")
}
}
Loading

0 comments on commit f7561a9

Please sign in to comment.