Skip to content

Commit 5410b73

Browse files
HIVE-29122: Vectorization - Support IGNORE NULLS for FIRST_VALUE and LAST_VALUE (#6009)
1 parent 3b97025 commit 5410b73

15 files changed

+3928
-33
lines changed

ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorBase.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public abstract class VectorPTFEvaluatorBase {
5050
protected int inputColumnNum;
5151
protected final int outputColumnNum;
5252
private boolean nullsLast;
53+
private boolean respectNulls = true;
5354

5455
protected final Logger LOG = LoggerFactory.getLogger(getClass());
5556

@@ -194,4 +195,12 @@ public boolean isCacheableForRange() {
194195
*/
195196
public void mapCustomColumns(int[] bufferedColumnMap) {
196197
}
198+
199+
public boolean doesRespectNulls() {
200+
return respectNulls;
201+
}
202+
203+
public void setRespectNulls(boolean respectNulls) {
204+
this.respectNulls = respectNulls;
205+
}
197206
}

ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalFirstValue.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.hadoop.hive.ql.exec.vector.ptf;
2020

21-
import org.apache.hadoop.hive.common.type.HiveDecimal;
2221
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
2322
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
2423
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
@@ -44,7 +43,6 @@ public class VectorPTFEvaluatorDecimalFirstValue extends VectorPTFEvaluatorBase
4443
public VectorPTFEvaluatorDecimalFirstValue(WindowFrameDef windowFrameDef,
4544
VectorExpression inputVecExpr, int outputColumnNum) {
4645
super(windowFrameDef, inputVecExpr, outputColumnNum);
47-
firstValue = new HiveDecimalWritable();
4846
resetEvaluator();
4947
}
5048

@@ -66,21 +64,32 @@ public void evaluateGroupBatch(VectorizedRowBatch batch)
6664
}
6765
DecimalColumnVector decimalColVector = ((DecimalColumnVector) batch.cols[inputColumnNum]);
6866
if (decimalColVector.isRepeating) {
69-
7067
if (decimalColVector.noNulls || !decimalColVector.isNull[0]) {
7168
firstValue.set(decimalColVector.vector[0]);
7269
isGroupResultNull = false;
7370
}
7471
} else if (decimalColVector.noNulls) {
7572
firstValue.set(decimalColVector.vector[0]);
7673
isGroupResultNull = false;
77-
} else {
74+
} else if (doesRespectNulls()) {
7875
if (!decimalColVector.isNull[0]) {
7976
firstValue.set(decimalColVector.vector[0]);
8077
isGroupResultNull = false;
8178
}
79+
} else {
80+
// If we do not respect nulls, we just take the first value and ignore nulls.
81+
for (int i = 0; i < size; i++) {
82+
if (!decimalColVector.isNull[i]) {
83+
firstValue.set(decimalColVector.vector[i]);
84+
isGroupResultNull = false;
85+
break;
86+
}
87+
}
8288
}
83-
haveFirstValue = true;
89+
// If nulls are respected, we set haveFirstValue to true as we don't need to look for a non-null value
90+
// Otherwise, we should keep looking for a non-null value in the next batches, i.e.,
91+
// until group result is not null.
92+
haveFirstValue = doesRespectNulls() || !isGroupResultNull;
8493
}
8594

8695
/*
@@ -122,7 +131,7 @@ public Object getGroupResult() {
122131
public void resetEvaluator() {
123132
haveFirstValue = false;
124133
isGroupResultNull = true;
125-
firstValue.set(HiveDecimal.ZERO);
134+
firstValue = new HiveDecimalWritable();
126135
}
127136

128137
// this is not necessarily needed, because first_value is evaluated in a streaming way, therefore

ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalLastValue.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ public class VectorPTFEvaluatorDecimalLastValue extends VectorPTFEvaluatorBase {
4343
public VectorPTFEvaluatorDecimalLastValue(WindowFrameDef windowFrameDef,
4444
VectorExpression inputVecExpr, int outputColumnNum) {
4545
super(windowFrameDef, inputVecExpr, outputColumnNum);
46-
lastValue = new HiveDecimalWritable();
4746
resetEvaluator();
4847
}
4948

@@ -64,7 +63,6 @@ public void evaluateGroupBatch(VectorizedRowBatch batch)
6463
}
6564
DecimalColumnVector decimalColVector = ((DecimalColumnVector) batch.cols[inputColumnNum]);
6665
if (decimalColVector.isRepeating) {
67-
6866
if (decimalColVector.noNulls || !decimalColVector.isNull[0]) {
6967
lastValue.set(decimalColVector.vector[0]);
7068
isGroupResultNull = false;
@@ -74,14 +72,24 @@ public void evaluateGroupBatch(VectorizedRowBatch batch)
7472
} else if (decimalColVector.noNulls) {
7573
lastValue.set(decimalColVector.vector[size - 1]);
7674
isGroupResultNull = false;
77-
} else {
75+
} else if (doesRespectNulls()) {
7876
final int lastBatchIndex = size - 1;
7977
if (!decimalColVector.isNull[lastBatchIndex]) {
8078
lastValue.set(decimalColVector.vector[lastBatchIndex]);
8179
isGroupResultNull = false;
8280
} else {
8381
isGroupResultNull = true;
8482
}
83+
} else {
84+
// If we do not respect nulls, we can keep checking from the end of the batch
85+
isGroupResultNull = !lastValue.isSet();
86+
for (int i = size-1; i >= 0; i--) {
87+
if (!decimalColVector.isNull[i]) {
88+
lastValue.set(decimalColVector.vector[i]);
89+
isGroupResultNull = false;
90+
break;
91+
}
92+
}
8593
}
8694
}
8795

@@ -109,7 +117,7 @@ public Object getGroupResult() {
109117
@Override
110118
public void resetEvaluator() {
111119
isGroupResultNull = true;
112-
lastValue.set(HiveDecimal.ZERO);
120+
lastValue = new HiveDecimalWritable();
113121
}
114122

115123
public boolean isCacheableForRange() {

ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleFirstValue.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class VectorPTFEvaluatorDoubleFirstValue extends VectorPTFEvaluatorBase {
3737

3838
protected boolean haveFirstValue;
3939
protected boolean isGroupResultNull;
40-
protected double firstValue;
40+
protected Double firstValue = null;
4141

4242
public VectorPTFEvaluatorDoubleFirstValue(WindowFrameDef windowFrameDef,
4343
VectorExpression inputVecExpr, int outputColumnNum) {
@@ -63,21 +63,32 @@ public void evaluateGroupBatch(VectorizedRowBatch batch)
6363
}
6464
DoubleColumnVector doubleColVector = ((DoubleColumnVector) batch.cols[inputColumnNum]);
6565
if (doubleColVector.isRepeating) {
66-
6766
if (doubleColVector.noNulls || !doubleColVector.isNull[0]) {
6867
firstValue = doubleColVector.vector[0];
6968
isGroupResultNull = false;
7069
}
7170
} else if (doubleColVector.noNulls) {
7271
firstValue = doubleColVector.vector[0];
7372
isGroupResultNull = false;
74-
} else {
73+
} else if (doesRespectNulls()) {
7574
if (!doubleColVector.isNull[0]) {
7675
firstValue = doubleColVector.vector[0];
7776
isGroupResultNull = false;
7877
}
78+
} else {
79+
// If we do not respect nulls, we just take the first value and ignore nulls.
80+
for (int i = 0; i < size; i++) {
81+
if (!doubleColVector.isNull[i]) {
82+
firstValue = doubleColVector.vector[i];
83+
isGroupResultNull = false;
84+
break;
85+
}
86+
}
7987
}
80-
haveFirstValue = true;
88+
// If nulls are respected, we set haveFirstValue to true as we don't need to look for a non-null value
89+
// Otherwise, we should keep looking for a non-null value in the next batches, i.e.,
90+
// until group result is not null.
91+
haveFirstValue = doesRespectNulls() || !isGroupResultNull;
8192
}
8293

8394
/*
@@ -119,7 +130,7 @@ public Object getGroupResult() {
119130
public void resetEvaluator() {
120131
haveFirstValue = false;
121132
isGroupResultNull = true;
122-
firstValue = 0.0;
133+
firstValue = null;
123134
}
124135

125136
// this is not necessarily needed, because first_value is evaluated in a streaming way, therefore

ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleLastValue.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
public class VectorPTFEvaluatorDoubleLastValue extends VectorPTFEvaluatorBase {
3636

3737
protected boolean isGroupResultNull;
38-
protected double lastValue;
38+
protected Double lastValue = null;
3939

4040
public VectorPTFEvaluatorDoubleLastValue(WindowFrameDef windowFrameDef,
4141
VectorExpression inputVecExpr, int outputColumnNum) {
@@ -60,7 +60,6 @@ public void evaluateGroupBatch(VectorizedRowBatch batch)
6060
}
6161
DoubleColumnVector doubleColVector = ((DoubleColumnVector) batch.cols[inputColumnNum]);
6262
if (doubleColVector.isRepeating) {
63-
6463
if (doubleColVector.noNulls || !doubleColVector.isNull[0]) {
6564
lastValue = doubleColVector.vector[0];
6665
isGroupResultNull = false;
@@ -70,14 +69,24 @@ public void evaluateGroupBatch(VectorizedRowBatch batch)
7069
} else if (doubleColVector.noNulls) {
7170
lastValue = doubleColVector.vector[size - 1];
7271
isGroupResultNull = false;
73-
} else {
72+
} else if (doesRespectNulls()) {
7473
final int lastBatchIndex = size - 1;
7574
if (!doubleColVector.isNull[lastBatchIndex]) {
7675
lastValue = doubleColVector.vector[lastBatchIndex];
7776
isGroupResultNull = false;
7877
} else {
7978
isGroupResultNull = true;
8079
}
80+
} else {
81+
// If we do not respect nulls, we can keep checking from the end of the batch
82+
isGroupResultNull = lastValue == null;
83+
for (int i = size-1; i >= 0; i--) {
84+
if (!doubleColVector.isNull[i]) {
85+
lastValue = doubleColVector.vector[i];
86+
isGroupResultNull = false;
87+
break;
88+
}
89+
}
8190
}
8291
}
8392

@@ -105,7 +114,7 @@ public Object getGroupResult() {
105114
@Override
106115
public void resetEvaluator() {
107116
isGroupResultNull = true;
108-
lastValue = 0.0;
117+
lastValue = null;
109118
}
110119

111120
public boolean isCacheableForRange() {

ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongFirstValue.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class VectorPTFEvaluatorLongFirstValue extends VectorPTFEvaluatorBase {
3737

3838
protected boolean haveFirstValue;
3939
protected boolean isGroupResultNull;
40-
protected long firstValue;
40+
protected Long firstValue = null;
4141

4242
public VectorPTFEvaluatorLongFirstValue(WindowFrameDef windowFrameDef,
4343
VectorExpression inputVecExpr, int outputColumnNum) {
@@ -63,21 +63,32 @@ public void evaluateGroupBatch(VectorizedRowBatch batch)
6363
}
6464
LongColumnVector longColVector = ((LongColumnVector) batch.cols[inputColumnNum]);
6565
if (longColVector.isRepeating) {
66-
6766
if (longColVector.noNulls || !longColVector.isNull[0]) {
6867
firstValue = longColVector.vector[0];
6968
isGroupResultNull = false;
7069
}
7170
} else if (longColVector.noNulls) {
7271
firstValue = longColVector.vector[0];
7372
isGroupResultNull = false;
74-
} else {
73+
} else if (doesRespectNulls()) {
7574
if (!longColVector.isNull[0]) {
7675
firstValue = longColVector.vector[0];
7776
isGroupResultNull = false;
7877
}
78+
} else {
79+
// If we do not respect nulls, we just take the first value and ignore nulls.
80+
for (int i = 0; i < size; i++) {
81+
if (!longColVector.isNull[i]) {
82+
firstValue = longColVector.vector[i];
83+
isGroupResultNull = false;
84+
break;
85+
}
86+
}
7987
}
80-
haveFirstValue = true;
88+
// If nulls are respected, we set haveFirstValue to true as we don't need to look for a non-null value
89+
// Otherwise, we should keep looking for a non-null value in the next batches, i.e.,
90+
// until group result is not null.
91+
haveFirstValue = doesRespectNulls() || !isGroupResultNull;
8192
}
8293

8394
/*
@@ -119,7 +130,7 @@ public Object getGroupResult() {
119130
public void resetEvaluator() {
120131
haveFirstValue = false;
121132
isGroupResultNull = true;
122-
firstValue = 0;
133+
firstValue = null;
123134
}
124135

125136
// this is not necessarily needed, because first_value is evaluated in a streaming way, therefore

ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorLongLastValue.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
public class VectorPTFEvaluatorLongLastValue extends VectorPTFEvaluatorBase {
3737

3838
protected boolean isGroupResultNull;
39-
protected long lastValue;
39+
protected Long lastValue = null;
4040

4141
public VectorPTFEvaluatorLongLastValue(WindowFrameDef windowFrameDef,
4242
VectorExpression inputVecExpr, int outputColumnNum) {
@@ -61,7 +61,6 @@ public void evaluateGroupBatch(VectorizedRowBatch batch)
6161
}
6262
LongColumnVector longColVector = ((LongColumnVector) batch.cols[inputColumnNum]);
6363
if (longColVector.isRepeating) {
64-
6564
if (longColVector.noNulls || !longColVector.isNull[0]) {
6665
lastValue = longColVector.vector[0];
6766
isGroupResultNull = false;
@@ -71,14 +70,24 @@ public void evaluateGroupBatch(VectorizedRowBatch batch)
7170
} else if (longColVector.noNulls) {
7271
lastValue = longColVector.vector[size - 1];
7372
isGroupResultNull = false;
74-
} else {
73+
} else if (doesRespectNulls()) {
7574
final int lastBatchIndex = size - 1;
7675
if (!longColVector.isNull[lastBatchIndex]) {
7776
lastValue = longColVector.vector[lastBatchIndex];
7877
isGroupResultNull = false;
7978
} else {
8079
isGroupResultNull = true;
8180
}
81+
} else {
82+
// If we do not respect nulls, we can keep checking from the end of the batch
83+
isGroupResultNull = lastValue == null;
84+
for (int i = size-1; i >= 0; i--) {
85+
if (!longColVector.isNull[i]) {
86+
lastValue = longColVector.vector[i];
87+
isGroupResultNull = false;
88+
break;
89+
}
90+
}
8291
}
8392
}
8493

@@ -106,7 +115,7 @@ public Object getGroupResult() {
106115
@Override
107116
public void resetEvaluator() {
108117
isGroupResultNull = true;
109-
lastValue = 0;
118+
lastValue = null;
110119
}
111120

112121
public boolean isCacheableForRange() {

ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4919,15 +4919,19 @@ private static VectorExpression fixDecimalDataTypePhysicalVariations(final Vecto
49194919
return parent;
49204920
}
49214921

4922-
private static void fillInPTFEvaluators(List<WindowFunctionDef> windowsFunctions,
4923-
String[] evaluatorFunctionNames, boolean[] evaluatorsAreDistinct,
4922+
private static void fillInPTFEvaluators(
4923+
List<WindowFunctionDef> windowsFunctions,
4924+
String[] evaluatorFunctionNames,
4925+
boolean[] evaluatorsAreDistinct,
4926+
boolean[] evaluatorsRespectNulls,
49244927
WindowFrameDef[] evaluatorWindowFrameDefs,
4925-
List<ExprNodeDesc>[] evaluatorInputExprNodeDescLists) throws HiveException {
4928+
List<ExprNodeDesc>[] evaluatorInputExprNodeDescLists) {
49264929
final int functionCount = windowsFunctions.size();
49274930
for (int i = 0; i < functionCount; i++) {
49284931
WindowFunctionDef winFunc = windowsFunctions.get(i);
49294932
evaluatorFunctionNames[i] = winFunc.getName();
49304933
evaluatorsAreDistinct[i] = winFunc.isDistinct();
4934+
evaluatorsRespectNulls[i] = winFunc.respectNulls();
49314935
evaluatorWindowFrameDefs[i] = winFunc.getWindowFrame();
49324936

49334937
List<PTFExpressionDef> args = winFunc.getArgs();
@@ -5038,13 +5042,15 @@ private static void createVectorPTFDesc(Operator<? extends OperatorDesc> ptfOp,
50385042

50395043
String[] evaluatorFunctionNames = new String[functionCount];
50405044
boolean[] evaluatorsAreDistinct = new boolean[functionCount];
5045+
boolean[] evaluatorsRespectNulls = new boolean[functionCount];
50415046
WindowFrameDef[] evaluatorWindowFrameDefs = new WindowFrameDef[functionCount];
50425047
List<ExprNodeDesc>[] evaluatorInputExprNodeDescLists = (List<ExprNodeDesc>[]) new List<?>[functionCount];
50435048

50445049
fillInPTFEvaluators(
50455050
windowsFunctions,
50465051
evaluatorFunctionNames,
50475052
evaluatorsAreDistinct,
5053+
evaluatorsRespectNulls,
50485054
evaluatorWindowFrameDefs,
50495055
evaluatorInputExprNodeDescLists);
50505056

@@ -5057,6 +5063,7 @@ private static void createVectorPTFDesc(Operator<? extends OperatorDesc> ptfOp,
50575063

50585064
vectorPTFDesc.setEvaluatorFunctionNames(evaluatorFunctionNames);
50595065
vectorPTFDesc.setEvaluatorsAreDistinct(evaluatorsAreDistinct);
5066+
vectorPTFDesc.setEvaluatorsRespectNulls(evaluatorsRespectNulls);
50605067
vectorPTFDesc.setEvaluatorWindowFrameDefs(evaluatorWindowFrameDefs);
50615068
vectorPTFDesc.setEvaluatorInputExprNodeDescLists(evaluatorInputExprNodeDescLists);
50625069

0 commit comments

Comments
 (0)