Skip to content
Permalink
Browse files
[ASTERIXDB-3004][RT] Improve hash join performance
- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Improve hash join performance when joined values are NULL/MISSING
- Add SqlppHashJoinRQJTest to test different hash join scenarios

Change-Id: I8f0afb05908e8281f2865775e074d459964fe989
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/14784
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
  • Loading branch information
Dmitry Lychagin committed Jan 14, 2022
1 parent c503ef8 commit 64786ba65f6b1b950534a46736c2b2eb3b6605e8
Show file tree
Hide file tree
Showing 25 changed files with 664 additions and 195 deletions.
@@ -73,11 +73,10 @@ public static List<ILibraryManager> setUp(boolean cleanup, String configFile,
}
integrationUtil.init(cleanup, configFile);

if (LOGGER.isInfoEnabled()) {
LOGGER.info("initializing HDFS");
}

if (startHdfs) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("initializing HDFS");
}
HDFSCluster.getInstance().setup();
}

Large diffs are not rendered by default.

@@ -127,7 +127,7 @@ public class SqlppNumericIndexRQGTest {
@BeforeClass
public static void setUp() throws Exception {
testExecutor = new TestExecutor();
LangExecutionUtil.setUp(SqlppRQGTestBase.TEST_CONFIG_FILE_NAME, testExecutor);
LangExecutionUtil.setUp(SqlppRQGTestBase.TEST_CONFIG_FILE_NAME, testExecutor, false);

StringBuilder sb = new StringBuilder(2048);
addDropDataverse(sb, DATAVERSE_NAME);
@@ -293,7 +293,7 @@ private static void addCreateDataset(StringBuilder sb, String dataverseName, Str
}
}
sb.append(") ");
sb.append("OPEN TYPE PRIMARY KEY id;\n");
sb.append("OPEN TYPE PRIMARY KEY ").append(ID_COLUMN_NAME).append(";\n");
}

private static void addLoadDataset(StringBuilder sb, String dataverseName, String datasetName) {
@@ -381,7 +381,7 @@ protected static long getLongConfigurationProperty(String propertyName, long def

protected static void startAsterix() throws Exception {
testExecutor = new TestExecutor();
LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor, false);
loadAsterixData();
}

@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Test hash join when values on both side are MISSING
*/

with
R as (
from range(1, 50000) r
select (case when get_year(current_date()) > 0 then missing else r end) as r
),

S as (
from range(1, 50000) s
select (case when get_year(current_date()) > 0 then missing else s end) as s
)

select count(*) cnt
from R, S
where R.r = S.s;
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Test hash join when values on both side are NULL
*/

with
R as (
from range(1, 50000) r
select (case when get_year(current_date()) > 0 then null else r end) as r
),

S as (
from range(1, 50000) s
select (case when get_year(current_date()) > 0 then null else s end) as s
)

select count(*) cnt
from R, S
where R.r = S.s;
@@ -0,0 +1 @@
{ "cnt": 0 }
@@ -0,0 +1 @@
{ "cnt": 0 }
@@ -6629,6 +6629,11 @@
<output-dir compare="Text">hash_join_array</output-dir>
</compilation-unit>
</test-case>
<test-case FilePath="join">
<compilation-unit name="hash_join_missing">
<output-dir compare="Text">hash_join_missing</output-dir>
</compilation-unit>
</test-case>
<test-case FilePath="join">
<compilation-unit name="hash_join_record">
<output-dir compare="Text">hash_join_record</output-dir>
@@ -26,31 +26,24 @@
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;

/*
Provides PredicateEvaluator for equi-join cases to properly take care of NULL fields, being compared with each other.
If any of the join keys, from either side, is NULL, record should not pass equi-join condition.
*/
/**
* Provides PredicateEvaluator for equi-join cases to disqualify tuples having NULL/MISSING fields
* If any of the join keys, from either side, is NULL/MISSING, the tuple will not pass equi-join condition.
*/
public class PredicateEvaluatorFactoryProvider implements IPredicateEvaluatorFactoryProvider {

private static final long serialVersionUID = 1L;
public static final PredicateEvaluatorFactoryProvider INSTANCE = new PredicateEvaluatorFactoryProvider();

@Override
public IPredicateEvaluatorFactory getPredicateEvaluatorFactory(final int[] keys0, final int[] keys1) {
public IPredicateEvaluatorFactory getPredicateEvaluatorFactory(final int[] keys) {

return new IPredicateEvaluatorFactory() {
private static final long serialVersionUID = 1L;

@Override
public IPredicateEvaluator createPredicateEvaluator() {
return new IPredicateEvaluator() {

@Override
public boolean evaluate(IFrameTupleAccessor fta0, int tupId0, IFrameTupleAccessor fta1,
int tupId1) {
return noNullOrMissingInKeys(fta0, tupId0, keys0) && noNullOrMissingInKeys(fta1, tupId1, keys1);
}
};
return (fta, tupId) -> noNullOrMissingInKeys(fta, tupId, keys);
}
};
}
@@ -109,10 +109,11 @@ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext
IBinaryHashFunctionFamily[] rightHashFunFamilies =
JobGenHelper.variablesToBinaryHashFunctionFamilies(keysRightBranch, env, context);

IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider =
context.getPredicateEvaluatorFactoryProvider();
IPredicateEvaluatorFactory predEvaluatorFactory = predEvaluatorFactoryProvider == null ? null
: predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight);
IPredicateEvaluatorFactoryProvider predEvalFactoryProvider = context.getPredicateEvaluatorFactoryProvider();
IPredicateEvaluatorFactory leftPredEvalFactory =
predEvalFactoryProvider == null ? null : predEvalFactoryProvider.getPredicateEvaluatorFactory(keysLeft);
IPredicateEvaluatorFactory rightPredEvalFactory = predEvalFactoryProvider == null ? null
: predEvalFactoryProvider.getPredicateEvaluatorFactory(keysRight);

RecordDescriptor recDescriptor =
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
@@ -131,7 +132,7 @@ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext

opDesc = generateOptimizedHashJoinRuntime(context, joinOp, inputSchemas, keysLeft, keysRight,
leftHashFunFamilies, rightHashFunFamilies, comparatorFactory, reverseComparatorFactory,
predEvaluatorFactory, recDescriptor, spec);
leftPredEvalFactory, rightPredEvalFactory, recDescriptor, spec);
opDesc.setSourceLocation(op.getSourceLocation());
contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);

@@ -145,20 +146,20 @@ private IOperatorDescriptor generateOptimizedHashJoinRuntime(JobGenContext conte
AbstractBinaryJoinOperator joinOp, IOperatorSchema[] inputSchemas, int[] keysLeft, int[] keysRight,
IBinaryHashFunctionFamily[] leftHashFunFamilies, IBinaryHashFunctionFamily[] rightHashFunFamilies,
ITuplePairComparatorFactory comparatorFactory, ITuplePairComparatorFactory reverseComparatorFactory,
IPredicateEvaluatorFactory predEvaluatorFactory, RecordDescriptor recDescriptor,
IOperatorDescriptorRegistry spec) throws AlgebricksException {
IPredicateEvaluatorFactory leftPredEvalFactory, IPredicateEvaluatorFactory rightPredEvalFactory,
RecordDescriptor recDescriptor, IOperatorDescriptorRegistry spec) throws AlgebricksException {
int memSizeInFrames = localMemoryRequirements.getMemoryBudgetInFrames();
switch (kind) {
case INNER:
return new OptimizedHybridHashJoinOperatorDescriptor(spec, memSizeInFrames, maxInputBuildSizeInFrames,
getFudgeFactor(), keysLeft, keysRight, leftHashFunFamilies, rightHashFunFamilies, recDescriptor,
comparatorFactory, reverseComparatorFactory, predEvaluatorFactory);
comparatorFactory, reverseComparatorFactory, leftPredEvalFactory, rightPredEvalFactory);
case LEFT_OUTER:
IMissingWriterFactory[] nonMatchWriterFactories = JobGenHelper.createMissingWriterFactories(context,
((LeftOuterJoinOperator) joinOp).getMissingValue(), inputSchemas[1].getSize());
return new OptimizedHybridHashJoinOperatorDescriptor(spec, memSizeInFrames, maxInputBuildSizeInFrames,
getFudgeFactor(), keysLeft, keysRight, leftHashFunFamilies, rightHashFunFamilies, recDescriptor,
comparatorFactory, reverseComparatorFactory, predEvaluatorFactory, true,
comparatorFactory, reverseComparatorFactory, leftPredEvalFactory, rightPredEvalFactory, true,
nonMatchWriterFactories);
default:
throw new NotImplementedException();
@@ -44,8 +44,6 @@
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -93,11 +91,6 @@ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext
IBinaryHashFunctionFactory[] rightHashFunFactories =
JobGenHelper.variablesToBinaryHashFunctionFactories(keysRightBranch, env, context);

IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider =
context.getPredicateEvaluatorFactoryProvider();
IPredicateEvaluatorFactory predEvaluatorFactory = predEvaluatorFactoryProvider == null ? null
: predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight);

RecordDescriptor recDescriptor =
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
IOperatorSchema[] conditionInputSchemas = new IOperatorSchema[1];
@@ -116,15 +109,14 @@ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext
switch (kind) {
case INNER:
opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, leftHashFunFactories,
rightHashFunFactories, comparatorFactory, recDescriptor, tableSize, predEvaluatorFactory,
memSizeInFrames);
rightHashFunFactories, comparatorFactory, recDescriptor, tableSize, memSizeInFrames);
break;
case LEFT_OUTER:
IMissingWriterFactory[] nonMatchWriterFactories = JobGenHelper.createMissingWriterFactories(context,
((LeftOuterJoinOperator) joinOp).getMissingValue(), inputSchemas[1].getSize());
opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, leftHashFunFactories,
rightHashFunFactories, comparatorFactory, predEvaluatorFactory, recDescriptor, true,
nonMatchWriterFactories, tableSize, memSizeInFrames);
rightHashFunFactories, comparatorFactory, recDescriptor, true, nonMatchWriterFactories,
tableSize, memSizeInFrames);
break;
default:
throw new NotImplementedException();
@@ -21,9 +21,6 @@

import org.apache.hyracks.api.comm.IFrameTupleAccessor;

/*
* Compares two tuples to make sure that records, whose comparison keys are NULL do not pass comparator filter
*/
public interface IPredicateEvaluator {
public boolean evaluate(IFrameTupleAccessor fta0, int tupId0, IFrameTupleAccessor fta1, int tupId1);
boolean evaluate(IFrameTupleAccessor fta, int tupId);
}
@@ -22,9 +22,8 @@
import java.io.Serializable;

/*
* Provides PredicateEvaluator for equi-join related operators
* Provides PredicateEvaluator
*/

public interface IPredicateEvaluatorFactory extends Serializable {
public IPredicateEvaluator createPredicateEvaluator();
IPredicateEvaluator createPredicateEvaluator();
}
@@ -24,7 +24,6 @@
/*
* Provides PredicateEvaluatorFactory based on (equi-join) keys
*/

public interface IPredicateEvaluatorFactoryProvider extends Serializable {
public IPredicateEvaluatorFactory getPredicateEvaluatorFactory(int[] keys0, int[] keys1);
IPredicateEvaluatorFactory getPredicateEvaluatorFactory(int[] keys);
}
@@ -28,16 +28,21 @@ public class HybridHashJoinUtil {
private HybridHashJoinUtil() {
}

public enum SIDE {
BUILD,
PROBE
}

/**
* Prints out the detailed information for partitions: in-memory and spilled partitions.
* This method exists for a debug purpose.
*/
public String printPartitionInfo(BitSet spilledStatus, OptimizedHybridHashJoin.SIDE whichSide, int numOfPartitions,
int[] probePSizeInTups, int[] buildPSizeInTups, RunFileWriter[] probeRFWriters,
RunFileWriter[] buildRFWriters, IPartitionedTupleBufferManager bufferManager) {
public String printPartitionInfo(BitSet spilledStatus, SIDE whichSide, int numOfPartitions, int[] probePSizeInTups,
int[] buildPSizeInTups, RunFileWriter[] probeRFWriters, RunFileWriter[] buildRFWriters,
IPartitionedTupleBufferManager bufferManager) {
StringBuilder buf = new StringBuilder();
buf.append(">>> " + this + " " + Thread.currentThread().getId() + " printInfo():" + "\n");
if (whichSide == OptimizedHybridHashJoin.SIDE.BUILD) {
if (whichSide == SIDE.BUILD) {
buf.append("BUILD side" + "\n");
} else {
buf.append("PROBE side" + "\n");
@@ -49,7 +54,7 @@ public String printPartitionInfo(BitSet spilledStatus, OptimizedHybridHashJoin.S
int spilledPartByteSize = 0;
for (int pid = spilledStatus.nextSetBit(0); pid >= 0 && pid < numOfPartitions; pid =
spilledStatus.nextSetBit(pid + 1)) {
if (whichSide == OptimizedHybridHashJoin.SIDE.BUILD) {
if (whichSide == SIDE.BUILD) {
spilledTupleCount += buildPSizeInTups[pid];
spilledPartByteSize += buildRFWriters[pid].getFileSize();
buf.append("part:\t" + pid + "\t#tuple:\t" + buildPSizeInTups[pid] + "\tsize(MB):\t"
@@ -70,7 +75,7 @@ public String printPartitionInfo(BitSet spilledStatus, OptimizedHybridHashJoin.S
int inMemoryPartByteSize = 0;
for (int pid = spilledStatus.nextClearBit(0); pid >= 0 && pid < numOfPartitions; pid =
spilledStatus.nextClearBit(pid + 1)) {
if (whichSide == OptimizedHybridHashJoin.SIDE.BUILD) {
if (whichSide == SIDE.BUILD) {
inMemoryTupleCount += buildPSizeInTups[pid];
inMemoryPartByteSize += bufferManager.getPhysicalSize(pid);
} else {

0 comments on commit 64786ba

Please sign in to comment.