Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -399,24 +399,30 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
} else if (
sJoin.has_advanced_extension() &&
SubstraitParser::configSetInOptimization(sJoin.advanced_extension(), "isBHJ=")) {
bool hashTableBuildOncePerExecutorEnabled =
SubstraitParser::configSetInOptimization(
sJoin.advanced_extension(), "isHashTableBuildOncePerExecutor=");

std::string hashTableId = sJoin.hashtableid();

std::shared_ptr<core::OpaqueHashTable> opaqueSharedHashTable = nullptr;
bool joinHasNullKeys = false;

try {
auto hashTableBuilder = ObjectStore::retrieve<gluten::HashTableBuilder>(getJoin(hashTableId));
joinHasNullKeys = hashTableBuilder->joinHasNullKeys();
auto originalShared = hashTableBuilder->hashTable();
opaqueSharedHashTable = std::shared_ptr<core::OpaqueHashTable>(
originalShared, reinterpret_cast<core::OpaqueHashTable*>(originalShared.get()));

LOG(INFO) << "Successfully retrieved and aliased HashTable for reuse. ID: " << hashTableId;
} catch (const std::exception& e) {
LOG(WARNING)
<< "Error retrieving HashTable from ObjectStore: " << e.what()
<< ". Falling back to building new table. To ensure correct results, please verify that spark.gluten.velox.buildHashTableOncePerExecutor.enabled is set to false.";
opaqueSharedHashTable = nullptr;
if (hashTableBuildOncePerExecutorEnabled) {
try {
auto hashTableBuilder = ObjectStore::retrieve<gluten::HashTableBuilder>(getJoin(hashTableId));
joinHasNullKeys = hashTableBuilder->joinHasNullKeys();
auto originalShared = hashTableBuilder->hashTable();
opaqueSharedHashTable = std::shared_ptr<core::OpaqueHashTable>(
originalShared, reinterpret_cast<core::OpaqueHashTable*>(originalShared.get()));

LOG(INFO) << "Successfully retrieved and aliased HashTable for reuse. ID: " << hashTableId;
} catch (const std::exception& e) {
throw GlutenException(
"Error retrieving HashTable from ObjectStore: " + std::string(e.what()) +
" You can set spark.gluten.velox.buildHashTableOncePerExecutor.enabled"
" to false as workaround.");
}
}

// Create HashJoinNode node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.gluten.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.expression._
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.sql.shims.SparkShimLoader
Expand Down Expand Up @@ -281,6 +282,12 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with TransformSupport {
// isBHJ: 0 for SHJ, 1 for BHJ
// isNullAwareAntiJoin: 0 for false, 1 for true
// buildHashTableId: the unique id for the hash table of build plan
val isHashTableBuildOncePerExecutor =
if (
BackendsApiManager.getSettings.enableHashTableBuildOncePerExecutor() &&
GlutenConfig.get.enableColumnarBroadcastExchange
) { 1 }
else 0
joinParametersStr
.append("isBHJ=")
.append(isBHJ)
Expand All @@ -291,6 +298,9 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with TransformSupport {
.append("buildHashTableId=")
.append(buildHashTableId)
.append("\n")
.append("isHashTableBuildOncePerExecutor=")
.append(isHashTableBuildOncePerExecutor)
.append("\n")
.append("isExistenceJoin=")
.append(if (joinType.isInstanceOf[ExistenceJoin]) 1 else 0)
.append("\n")
Expand Down
Loading