Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,9 @@ class MetricsHandler extends MetricsApi with Logging {
"spillTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime to spill"),
"compressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime to compress"),
"prepareTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime to prepare"),
"decompressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime_decompress"),
"decompressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime decompress"),
"ipcTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime ipc"),
"deserializeTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime deserialize"),
"avgReadBatchNumRows" -> SQLMetrics
.createAverageMetric(sparkContext, "avg read batch num rows"),
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
Expand Down Expand Up @@ -329,8 +331,8 @@ class MetricsHandler extends MetricsApi with Logging {
"prepareTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to prepare left list"),
"processTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to process"),
"joinTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to merge join"),
"totaltime_sortmergejoin" -> SQLMetrics
.createTimingMetric(sparkContext, "totaltime_sortmergejoin")
"totaltimeSortmergejoin" -> SQLMetrics
.createTimingMetric(sparkContext, "totaltime sortmergejoin")
)

override def genSortMergeJoinTransformerMetricsUpdater(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,21 @@ class SparkPlanExecHandler extends SparkPlanExecApi {
val readBatchNumRows = metrics("avgReadBatchNumRows")
val numOutputRows = metrics("numOutputRows")
val decompressTime = metrics("decompressTime")
val ipcTime = metrics("ipcTime")
val deserializeTime = metrics("deserializeTime")
if (GlutenConfig.getConf.isUseCelebornShuffleManager) {
val clazz = ClassUtils.getClass("org.apache.spark.shuffle.CelebornColumnarBatchSerializer")
val constructor =
clazz.getConstructor(classOf[StructType], classOf[SQLMetric], classOf[SQLMetric])
constructor.newInstance(schema, readBatchNumRows, numOutputRows).asInstanceOf[Serializer]
} else {
new ColumnarBatchSerializer(schema, readBatchNumRows, numOutputRows, decompressTime)
new ColumnarBatchSerializer(
schema,
readBatchNumRows,
numOutputRows,
decompressTime,
ipcTime,
deserializeTime)
}
}

Expand Down
8 changes: 8 additions & 0 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ static jmethodID veloxColumnarBatchScannerNext;

static jclass shuffleReaderMetricsClass;
static jmethodID shuffleReaderMetricsSetDecompressTime;
static jmethodID shuffleReaderMetricsSetIpcTime;
static jmethodID shuffleReaderMetricsSetDeserializeTime;

class JavaInputStreamAdaptor final : public arrow::io::InputStream {
public:
Expand Down Expand Up @@ -293,6 +295,9 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
createGlobalClassReferenceOrError(env, "Lio/glutenproject/vectorized/ShuffleReaderMetrics;");
shuffleReaderMetricsSetDecompressTime =
getMethodIdOrError(env, shuffleReaderMetricsClass, "setDecompressTime", "(J)V");
shuffleReaderMetricsSetIpcTime = getMethodIdOrError(env, shuffleReaderMetricsClass, "setIpcTime", "(J)V");
shuffleReaderMetricsSetDeserializeTime =
getMethodIdOrError(env, shuffleReaderMetricsClass, "setDeserializeTime", "(J)V");

return jniVersion;
}
Expand Down Expand Up @@ -1056,6 +1061,9 @@ JNIEXPORT void JNICALL Java_io_glutenproject_vectorized_ShuffleReaderJniWrapper_

auto reader = executionCtx->getShuffleReader(shuffleReaderHandle);
env->CallVoidMethod(metrics, shuffleReaderMetricsSetDecompressTime, reader->getDecompressTime());
env->CallVoidMethod(metrics, shuffleReaderMetricsSetIpcTime, reader->getIpcTime());
env->CallVoidMethod(metrics, shuffleReaderMetricsSetDeserializeTime, reader->getDeserializeTime());

checkException(env);
JNI_METHOD_END()
}
Expand Down
20 changes: 14 additions & 6 deletions cpp/core/shuffle/ShuffleReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ using namespace gluten;
class ShuffleReaderOutStream : public ColumnarBatchIterator {
public:
ShuffleReaderOutStream(
const ReaderOptions& options,
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::io::InputStream>& in,
const ReaderOptions& options)
: options_(options), in_(in) {
const std::function<void(int64_t)> ipcTimeAccumulator)
: options_(options), in_(in), ipcTimeAccumulator_(ipcTimeAccumulator) {
if (options.compression_type != arrow::Compression::UNCOMPRESSED) {
writeSchema_ = toCompressWriteSchema(*schema);
} else {
Expand All @@ -44,20 +45,29 @@ class ShuffleReaderOutStream : public ColumnarBatchIterator {
std::shared_ptr<ColumnarBatch> next() override {
std::shared_ptr<arrow::RecordBatch> arrowBatch;
std::unique_ptr<arrow::ipc::Message> messageToRead;

int64_t ipcTime = 0;
TIME_NANO_START(ipcTime);

GLUTEN_ASSIGN_OR_THROW(messageToRead, arrow::ipc::ReadMessage(in_.get()))
if (messageToRead == nullptr) {
return nullptr;
}

GLUTEN_ASSIGN_OR_THROW(
arrowBatch, arrow::ipc::ReadRecordBatch(*messageToRead, writeSchema_, nullptr, options_.ipc_read_options))

TIME_NANO_END(ipcTime);
ipcTimeAccumulator_(ipcTime);

std::shared_ptr<ColumnarBatch> glutenBatch = std::make_shared<ArrowColumnarBatch>(arrowBatch);
return glutenBatch;
}

private:
ReaderOptions options_;
std::shared_ptr<arrow::io::InputStream> in_;
std::function<void(int64_t)> ipcTimeAccumulator_;
std::shared_ptr<arrow::Schema> writeSchema_;
};
} // namespace
Expand All @@ -75,16 +85,14 @@ ShuffleReader::ShuffleReader(
: pool_(pool), options_(std::move(options)), schema_(schema) {}

std::shared_ptr<ResultIterator> ShuffleReader::readStream(std::shared_ptr<arrow::io::InputStream> in) {
return std::make_shared<ResultIterator>(std::make_unique<ShuffleReaderOutStream>(schema_, in, options_));
return std::make_shared<ResultIterator>(std::make_unique<ShuffleReaderOutStream>(
options_, schema_, in, [this](int64_t ipcTime) { this->ipcTime_ += ipcTime; }));
}

arrow::Status ShuffleReader::close() {
return arrow::Status::OK();
}

int64_t ShuffleReader::getDecompressTime() {
return decompressTime_;
}
const std::shared_ptr<arrow::MemoryPool>& ShuffleReader::getPool() const {
return pool_;
}
Expand Down
18 changes: 17 additions & 1 deletion cpp/core/shuffle/ShuffleReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,28 @@ class ShuffleReader {
virtual std::shared_ptr<ResultIterator> readStream(std::shared_ptr<arrow::io::InputStream> in);

arrow::Status close();
int64_t getDecompressTime();

int64_t getDecompressTime() const {
return decompressTime_;
}

int64_t getIpcTime() const {
return ipcTime_;
}

int64_t getDeserializeTime() const {
return deserializeTime_;
}

const std::shared_ptr<arrow::MemoryPool>& getPool() const;

protected:
std::shared_ptr<arrow::MemoryPool> pool_;

int64_t decompressTime_ = 0;
int64_t ipcTime_ = 0;
int64_t deserializeTime_ = 0;

ReaderOptions options_;

private:
Expand Down
14 changes: 14 additions & 0 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@

#include "operators/functions/RegistrationAllFunctions.h"
#include "operators/plannodes/RowVectorStream.h"

#include "shuffle/VeloxShuffleReader.h"

#ifdef GLUTEN_ENABLE_QAT
#include "utils/qat/QatCodec.h"
#endif
Expand Down Expand Up @@ -96,6 +99,9 @@ const std::string kMaxSpillFileSizeDefault = std::to_string(20L * 1024 * 1024);
// backtrace allocation
const std::string kBacktraceAllocation = "spark.gluten.backtrace.allocation";

// VeloxShuffleReader print flag.
const std::string kVeloxShuffleReaderPrintFlag = "spark.gluten.velox.shuffleReaderPrintFlag";

} // namespace

namespace gluten {
Expand Down Expand Up @@ -145,6 +151,14 @@ void VeloxBackend::init(const std::unordered_map<std::string, std::string>& conf
}
}

// Set veloxShuffleReaderPrintFlag
{
auto got = conf.find(kVeloxShuffleReaderPrintFlag);
if (got != conf.end()) {
gluten::veloxShuffleReaderPrintFlag = (got->second == "true");
}
}

// Setup and register.
velox::filesystems::registerLocalFileSystem();
initJolFilesystem(conf);
Expand Down
67 changes: 64 additions & 3 deletions cpp/velox/shuffle/VeloxShuffleReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ using namespace facebook::velox;

namespace gluten {

bool veloxShuffleReaderPrintFlag = true;

namespace {

struct BufferViewReleaser {
Expand Down Expand Up @@ -327,6 +329,7 @@ RowVectorPtr readRowVector(
CodecBackend codecBackend,
CompressionMode compressionMode,
int64_t& decompressTime,
int64_t& deserializeTime,
arrow::MemoryPool* arrowPool,
memory::MemoryPool* pool) {
auto header = readColumnBuffer(batch, 0);
Expand All @@ -349,7 +352,12 @@ RowVectorPtr readRowVector(
getUncompressedBuffers(batch, arrowPool, codec.get(), compressionMode, buffers);
TIME_NANO_END(decompressTime);
}
return deserialize(rowType, length, buffers, pool);

TIME_NANO_START(deserializeTime);
auto rv = deserialize(rowType, length, buffers, pool);
TIME_NANO_END(deserializeTime);

return rv;
}

class VeloxShuffleReaderOutStream : public ColumnarBatchIterator {
Expand All @@ -359,13 +367,15 @@ class VeloxShuffleReaderOutStream : public ColumnarBatchIterator {
const std::shared_ptr<facebook::velox::memory::MemoryPool>& veloxPool,
const ReaderOptions& options,
const RowTypePtr& rowType,
const std::function<void(int64_t&)> decompressionTimeAccumulator,
const std::function<void(int64_t)> decompressionTimeAccumulator,
const std::function<void(int64_t)> deserializeTimeAccumulator,
ResultIterator& in)
: pool_(pool),
veloxPool_(veloxPool),
options_(options),
rowType_(rowType),
decompressionTimeAccumulator_(decompressionTimeAccumulator),
deserializeTimeAccumulator_(deserializeTimeAccumulator),
in_(std::move(in)) {}

std::shared_ptr<ColumnarBatch> next() override {
Expand All @@ -374,16 +384,22 @@ class VeloxShuffleReaderOutStream : public ColumnarBatchIterator {
}
auto batch = in_.next();
auto rb = std::dynamic_pointer_cast<ArrowColumnarBatch>(batch)->getRecordBatch();

int64_t decompressTime = 0LL;
int64_t deserializeTime = 0LL;

auto vp = readRowVector(
*rb,
rowType_,
options_.codec_backend,
options_.compression_mode,
decompressTime,
deserializeTime,
pool_.get(),
veloxPool_.get());

decompressionTimeAccumulator_(decompressTime);
deserializeTimeAccumulator_(deserializeTime);
return std::make_shared<VeloxColumnarBatch>(vp);
}

Expand All @@ -392,10 +408,47 @@ class VeloxShuffleReaderOutStream : public ColumnarBatchIterator {
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
ReaderOptions options_;
facebook::velox::RowTypePtr rowType_;
std::function<void(int64_t&)> decompressionTimeAccumulator_;

std::function<void(int64_t)> decompressionTimeAccumulator_;
std::function<void(int64_t)> deserializeTimeAccumulator_;

ResultIterator in_;
};

std::string getCompressionMode(CompressionMode type) {
if (type == CompressionMode::BUFFER) {
return "BUFFER";
} else if (type == CompressionMode::ROWVECTOR) {
return "ROWVECTOR";
} else {
return "UNKNOWN";
}
}

std::string getCodecBackend(CodecBackend type) {
if (type == CodecBackend::QAT) {
return "QAT";
} else if (type == CodecBackend::IAA) {
return "IAA";
} else {
return "NONE";
}
}

std::string getCompressionType(arrow::Compression::type type) {
if (type == arrow::Compression::UNCOMPRESSED) {
return "UNCOMPRESSED";
} else if (type == arrow::Compression::LZ4_FRAME) {
return "LZ4_FRAME";
} else if (type == arrow::Compression::ZSTD) {
return "ZSTD";
} else if (type == arrow::Compression::GZIP) {
return "GZIP";
} else {
return "UNKNOWN";
}
}

} // namespace

VeloxShuffleReader::VeloxShuffleReader(
Expand All @@ -405,6 +458,13 @@ VeloxShuffleReader::VeloxShuffleReader(
std::shared_ptr<memory::MemoryPool> veloxPool)
: ShuffleReader(schema, options, pool), veloxPool_(std::move(veloxPool)) {
rowType_ = asRowType(gluten::fromArrowSchema(schema));
if (gluten::veloxShuffleReaderPrintFlag) {
std::ostringstream oss;
oss << "VeloxShuffleReader create, compression_type:" << getCompressionType(options.compression_type);
oss << " codec_backend:" << getCodecBackend(options.codec_backend);
oss << " compression_mode:" << getCompressionMode(options.compression_mode);
LOG(INFO) << oss.str();
}
}

std::shared_ptr<ResultIterator> VeloxShuffleReader::readStream(std::shared_ptr<arrow::io::InputStream> in) {
Expand All @@ -415,6 +475,7 @@ std::shared_ptr<ResultIterator> VeloxShuffleReader::readStream(std::shared_ptr<a
options_,
rowType_,
[this](int64_t decompressionTime) { this->decompressTime_ += decompressionTime; },
[this](int64_t deserializeTime) { this->deserializeTime_ += deserializeTime; },
*wrappedIn));
}

Expand Down
3 changes: 3 additions & 0 deletions cpp/velox/shuffle/VeloxShuffleReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "velox/vector/ComplexVector.h"

namespace gluten {

class VeloxShuffleReader final : public ShuffleReader {
public:
VeloxShuffleReader(
Expand All @@ -37,4 +38,6 @@ class VeloxShuffleReader final : public ShuffleReader {
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
};

extern bool veloxShuffleReaderPrintFlag;

} // namespace gluten
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

public class ShuffleReaderMetrics {
private long decompressTime;
private long ipcTime;
private long deserializeTime;

public void setDecompressTime(long decompressTime) {
this.decompressTime = decompressTime;
Expand All @@ -26,4 +28,20 @@ public void setDecompressTime(long decompressTime) {
public long getDecompressTime() {
return decompressTime;
}

public void setIpcTime(long ipcTime) {
this.ipcTime = ipcTime;
}

public long getIpcTime() {
return ipcTime;
}

public void setDeserializeTime(long ipcTime) {
this.deserializeTime = deserializeTime;
}

public long getDeserializeTime() {
return deserializeTime;
}
}
Loading