Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed May 9, 2023
1 parent ff21af9 commit da72337
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 43 deletions.
4 changes: 2 additions & 2 deletions cpp/velox/compute/VeloxColumnarToRowConverter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ arrow::Status VeloxColumnarToRowConverter::Write() {
SetNullAt(buffer_address_, offsets_[row_idx], field_offset, col_idx);
}
} else {
auto longDecimal = vec->asFlatVector<velox::UnscaledLongDecimal>()->rawValues();
if (flag) {
SetNullAt(buffer_address_, offsets_[row_idx], field_offset, col_idx);
} else {
auto longDecimal = vec->asFlatVector<velox::UnscaledLongDecimal>()->rawValues();
int32_t size;
velox::int128_t veloxInt128 = longDecimal[row_idx].unscaledValue();

Expand All @@ -212,7 +212,7 @@ arrow::Status VeloxColumnarToRowConverter::Write() {
}
break;
}
case arrow::Time64Type::type_id: {
case arrow::TimestampType::type_id: {
SERIALIZE_COLUMN(TimestampType);
break;
}
Expand Down
101 changes: 64 additions & 37 deletions cpp/velox/shuffle/VeloxShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,10 @@ arrow::Status VeloxShuffleWriter::Split(ColumnarBatch* cb) {

auto result = arrow::ImportRecordBatch(&arrowArray, schema_);
RETURN_NOT_OK(result);
std::shared_ptr<arrow::RecordBatch> batchPtr = result.ValueOrDie();

// 2. call CacheRecordBatch with RecordBatch
RETURN_NOT_OK(CacheRecordBatch(0, *(*result)));
RETURN_NOT_OK(CacheRecordBatch(0, *batchPtr));
} else {
auto& rv = *veloxColumnBatch->getFlattenedRowVector();
RETURN_NOT_OK(InitFromRowVector(rv));
Expand Down Expand Up @@ -346,25 +347,49 @@ arrow::Status VeloxShuffleWriter::SplitFixedWidthValueBuffer(const velox::RowVec
case 32:
RETURN_NOT_OK(SplitFixedType<uint32_t>(src_addr, dst_addrs));
break;
case 64:
case 64: {
if (column->type()->kind() == velox::TypeKind::TIMESTAMP) {
std::transform(
dst_addrs.begin(),
dst_addrs.end(),
partition_buffer_idx_base_.begin(),
partition_buffer_idx_offset_.begin(),
[](uint8_t* x, uint32_t y) { return x + y * sizeof(int64_t); });

const auto& tsVec = column->asFlatVector<velox::Timestamp>();
for (uint32_t pid = 0; pid < num_partitions_; ++pid) {
auto dst_pid_base = reinterpret_cast<int64_t*>(partition_buffer_idx_offset_[pid]);
auto pos = partition_2_row_offset_[pid];
auto end = partition_2_row_offset_[pid + 1];
for (; pos < end; ++pos) {
auto row_id = row_offset_2_row_id_[pos];
if (tsVec->mayHaveNulls() && tsVec->isNullAt(row_id)) {
*dst_pid_base++ = 0;
} else {
*dst_pid_base++ = tsVec->valueAt(row_id).toMicros(); // copy
}
}
}
break;
} else {
#ifdef PROCESSAVX
std::transform(
dst_addrs.begin(),
dst_addrs.end(),
partition_buffer_idx_base_.begin(),
partition_buffer_idx_offset_.begin(),
[](uint8_t* x, row_offset_type y) { return x + y * sizeof(uint64_t); });
for (auto pid = 0; pid < num_partitions_; pid++) {
auto dst_pid_base = reinterpret_cast<uint64_t*>(partition_buffer_idx_offset_[pid]); /*32k*/
auto r = partition_2_row_offset_[pid]; /*8k*/
auto size = partition_2_row_offset_[pid + 1];
std::transform(
dst_addrs.begin(),
dst_addrs.end(),
partition_buffer_idx_base_.begin(),
partition_buffer_idx_offset_.begin(),
[](uint8_t* x, row_offset_type y) { return x + y * sizeof(uint64_t); });
for (auto pid = 0; pid < num_partitions_; pid++) {
auto dst_pid_base = reinterpret_cast<uint64_t*>(partition_buffer_idx_offset_[pid]); /*32k*/
auto r = partition_2_row_offset_[pid]; /*8k*/
auto size = partition_2_row_offset_[pid + 1];
#if 1
for (r; r < size && (((uint64_t)dst_pid_base & 0x1f) > 0); r++) {
auto src_offset = row_offset_2_row_id_[r]; /*16k*/
*dst_pid_base = reinterpret_cast<uint64_t*>(src_addr)[src_offset]; /*64k*/
_mm_prefetch(&(src_addr)[src_offset * sizeof(uint64_t) + 64], _MM_HINT_T2);
dst_pid_base += 1;
}
for (r; r < size && (((uint64_t)dst_pid_base & 0x1f) > 0); r++) {
auto src_offset = row_offset_2_row_id_[r]; /*16k*/
*dst_pid_base = reinterpret_cast<uint64_t*>(src_addr)[src_offset]; /*64k*/
_mm_prefetch(&(src_addr)[src_offset * sizeof(uint64_t) + 64], _MM_HINT_T2);
dst_pid_base += 1;
}
#if 0
for (r; r+4<size; r+=4)
{
Expand All @@ -383,31 +408,33 @@ arrow::Status VeloxShuffleWriter::SplitFixedWidthValueBuffer(const velox::RowVec
dst_pid_base+=4;
}
#endif
for (r; r + 2 < size; r += 2) {
__m128i src_offset_2x = _mm_cvtsi32_si128(*((int32_t*)(row_offset_2_row_id_.data() + r)));
src_offset_2x = _mm_shufflelo_epi16(src_offset_2x, 0x98);
for (r; r + 2 < size; r += 2) {
__m128i src_offset_2x = _mm_cvtsi32_si128(*((int32_t*)(row_offset_2_row_id_.data() + r)));
src_offset_2x = _mm_shufflelo_epi16(src_offset_2x, 0x98);

__m128i src_2x = _mm_i32gather_epi64((const long long int*)src_addr, src_offset_2x, 8);
_mm_store_si128((__m128i*)dst_pid_base, src_2x);
//_mm_stream_si128((__m128i*)dst_pid_base,src_2x);
__m128i src_2x = _mm_i32gather_epi64((const long long int*)src_addr, src_offset_2x, 8);
_mm_store_si128((__m128i*)dst_pid_base, src_2x);
//_mm_stream_si128((__m128i*)dst_pid_base,src_2x);

_mm_prefetch(&(src_addr)[(uint32_t)row_offset_2_row_id_[r] * sizeof(uint64_t) + 64], _MM_HINT_T2);
_mm_prefetch(&(src_addr)[(uint32_t)row_offset_2_row_id_[r + 1] * sizeof(uint64_t) + 64], _MM_HINT_T2);
dst_pid_base += 2;
}
_mm_prefetch(&(src_addr)[(uint32_t)row_offset_2_row_id_[r] * sizeof(uint64_t) + 64], _MM_HINT_T2);
_mm_prefetch(&(src_addr)[(uint32_t)row_offset_2_row_id_[r + 1] * sizeof(uint64_t) + 64], _MM_HINT_T2);
dst_pid_base += 2;
}
#endif
for (r; r < size; r++) {
auto src_offset = row_offset_2_row_id_[r]; /*16k*/
*dst_pid_base = reinterpret_cast<const uint64_t*>(src_addr)[src_offset]; /*64k*/
_mm_prefetch(&(src_addr)[src_offset * sizeof(uint64_t) + 64], _MM_HINT_T2);
dst_pid_base += 1;
for (r; r < size; r++) {
auto src_offset = row_offset_2_row_id_[r]; /*16k*/
*dst_pid_base = reinterpret_cast<const uint64_t*>(src_addr)[src_offset]; /*64k*/
_mm_prefetch(&(src_addr)[src_offset * sizeof(uint64_t) + 64], _MM_HINT_T2);
dst_pid_base += 1;
}
}
}
break;
break;
#else
RETURN_NOT_OK(SplitFixedType<uint64_t>(src_addr, dst_addrs));
RETURN_NOT_OK(SplitFixedType<uint64_t>(src_addr, dst_addrs));
#endif
break;
break;
}
}
#if defined(__x86_64__)
case 128: // arrow::Decimal128Type::type_id
// too bad gcc generates movdqa even we use __m128i_u data type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ case class TransformPreOverrides(isAdaptiveContextOrTopParentExchange: Boolean)
val project = ProjectExec(
Seq(Alias(hashExpression, "hash_partition_key")()) ++ child.output, child)
AddTransformHintRule().apply(project)
replaceWithTransformerPlan(project)
TransformHints.getHint(project) match {
case TRANSFORM_SUPPORTED() => replaceWithTransformerPlan(project)
case TRANSFORM_UNSUPPORTED() => project
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ case class GlutenRowToArrowColumnarExec(child: SparkPlan)
case _: BinaryType =>
case _: DecimalType =>
case _: DateType =>
case _: TimestampType =>
case _ => return false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ object SparkArrowUtil {
case ArrowType.Binary.INSTANCE => BinaryType
case d: ArrowType.Decimal => DecimalType(d.getPrecision, d.getScale)
case date: ArrowType.Date if date.getUnit == DateUnit.DAY => DateType
case ts: ArrowType.Timestamp if ts.getUnit == TimeUnit.MICROSECOND => TimestampType
// TODO: Time unit is not handled.
case _: ArrowType.Timestamp => TimestampType
// case ArrowType.Null.INSTANCE => NullType
case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dt")
}

/** Maps field from Spark to Arrow. NOTE: timeZoneId required for TimestampType */
def toArrowField(
name: String, dt: DataType, nullable: Boolean, timeZoneId: String): Field = {
def toArrowField(name: String, dt: DataType, nullable: Boolean, timeZoneId: String): Field = {
dt match {
case ArrayType(elementType, containsNull) =>
val fieldType = new FieldType(nullable, ArrowType.List.INSTANCE, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -828,13 +828,21 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("returning batch for wide table")
// decimal failed ut
.exclude("SPARK-34212 Parquet should read decimals correctly")
// Timestamp is read as INT96.
.exclude("SPARK-10634 timestamp written and read as INT64 - truncation")
.exclude("Migration from INT96 to TIMESTAMP_MICROS timestamp type")
.exclude("SPARK-10365 timestamp written and read as INT64 - TIMESTAMP_MICROS")
enableSuite[GlutenParquetV2QuerySuite]
// spark.sql.parquet.enableVectorizedReader=true not supported
.exclude("SPARK-16632: read Parquet int32 as ByteType and ShortType")
.exclude("Enabling/disabling ignoreCorruptFiles")
.exclude("returning batch for wide table")
// decimal failed ut
.exclude("SPARK-34212 Parquet should read decimals correctly")
// Timestamp is read as INT96.
.exclude("SPARK-10634 timestamp written and read as INT64 - truncation")
.exclude("Migration from INT96 to TIMESTAMP_MICROS timestamp type")
.exclude("SPARK-10365 timestamp written and read as INT64 - TIMESTAMP_MICROS")
// requires resource files from Vanilla spark jar
// enableSuite[GlutenParquetRebaseDatetimeV1Suite]
// enableSuite[GlutenParquetRebaseDatetimeV2Suite]
Expand Down

0 comments on commit da72337

Please sign in to comment.