From 3acb265add415a9f4722935f9a69d6cd97b6f987 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=ED=98=95=EC=A4=80?= Date: Mon, 14 Jul 2014 12:31:21 +0900 Subject: [PATCH] TAJO-904: ORDER BY with a null column miss some data. --- .../tajo/catalog/statistics/ColumnStats.java | 20 +++++- .../catalog/statistics/StatisticsUtil.java | 3 + .../src/main/proto/CatalogProtos.proto | 2 + .../planner/RangePartitionAlgorithm.java | 2 +- .../apache/tajo/engine/utils/TupleUtil.java | 70 ++++++++++++++++--- .../master/querymaster/Repartitioner.java | 19 ++++- .../java/org/apache/tajo/worker/Task.java | 4 +- .../tajo/engine/query/TestSortQuery.java | 50 +++++++++++++ .../org/apache/tajo/storage/RowStoreUtil.java | 1 + .../apache/tajo/storage/TableStatistics.java | 8 ++- .../pullserver/TajoPullServerService.java | 2 +- 11 files changed, 159 insertions(+), 22 deletions(-) diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/ColumnStats.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/ColumnStats.java index 4d65d9a6c5..90b651fb62 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/ColumnStats.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/ColumnStats.java @@ -24,13 +24,13 @@ import com.google.common.base.Objects; import com.google.gson.annotations.Expose; import com.google.protobuf.ByteString; -import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.json.GsonObject; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.json.CatalogGsonHelper; +import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.common.ProtoObject; import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.json.GsonObject; import org.apache.tajo.util.TUtil; public class ColumnStats implements ProtoObject, Cloneable, GsonObject { @@ -41,6 +41,7 @@ public class ColumnStats implements ProtoObject, @Expose private Long numNulls = null; // optional @Expose private Datum minValue = null; // optional @Expose private Datum maxValue = null; // optional + @Expose private boolean maxValueNull = false; //optional public ColumnStats(Column column) { this.column = column; @@ -63,6 +64,9 @@ public ColumnStats(CatalogProtos.ColumnStatsProto proto) { if (proto.hasMaxValue()) { this.maxValue = DatumFactory.createFromBytes(getColumn().getDataType(), proto.getMaxValue().toByteArray()); } + if (proto.hasMaxValueNull()) { + this.maxValueNull = proto.getMaxValueNull(); + } } public Column getColumn() { @@ -109,6 +113,14 @@ public void setNumNulls(long numNulls) { this.numNulls = numNulls; } + public boolean isMaxValueNull() { + return maxValueNull; + } + + public void setMaxValueNull(boolean maxValueNull) { + this.maxValueNull = maxValueNull; + } + public boolean equals(Object obj) { if (obj instanceof ColumnStats) { ColumnStats other = (ColumnStats) obj; @@ -134,6 +146,7 @@ public Object clone() throws CloneNotSupportedException { stat.numNulls = numNulls; stat.minValue = minValue; stat.maxValue = maxValue; + stat.maxValueNull = maxValueNull; return stat; } @@ -168,6 +181,7 @@ public CatalogProtos.ColumnStatsProto getProto() { if (this.maxValue != null) { builder.setMaxValue(ByteString.copyFrom(this.maxValue.asByteArray())); } + builder.setMaxValueNull(maxValueNull); return builder.build(); } diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java index c481276c33..639ee7091b 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java @@ -132,6 +132,9 @@ public static TableStats aggregateTableStat(List tableStatses) { css[i].getMaxValue().compareTo(cs.getMaxValue()) < 0)) { css[i].setMaxValue(ts.getColumnStats().get(i).getMaxValue()); } + if (cs.isMaxValueNull()) { + css[i].setMaxValueNull(cs.isMaxValueNull()); + } } catch (Exception e) { LOG.warn(e.getMessage(), e); } diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto index 367d0b8380..f019320b7f 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto +++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto @@ -205,6 +205,8 @@ message ColumnStatsProto { optional int64 numNulls = 3; optional bytes minValue = 4; optional bytes maxValue = 5; + optional bool maxValueNull = 6; + } enum StatType { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java index a3522c7e65..0aa6f975a0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java @@ -115,7 +115,7 @@ public static BigDecimal computeCardinality(DataType dataType, Datum start, Datu break; case TEXT: final char textStart = (start instanceof NullDatum || start.size() == 0) ? '0' : start.asChars().charAt(0); - final char textEnd = (end instanceof NullDatum || start.size() == 0) ? '0' : end.asChars().charAt(0); + final char textEnd = (end instanceof NullDatum || end.size() == 0) ? '0' : end.asChars().charAt(0); if (isAscending) { columnCard = new BigDecimal(textEnd - textStart); } else { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java index 86f4935583..f40b6b10c9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java @@ -22,6 +22,8 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.codec.binary.Base64; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; @@ -44,6 +46,7 @@ import java.util.Map; public class TupleUtil { + private static final Log LOG = LogFactory.getLog(TupleUtil.class); public static String rangeToQuery(Schema schema, TupleRange range, boolean last) throws UnsupportedEncodingException { @@ -72,7 +75,41 @@ public static String rangeToQuery(TupleRange range, boolean last, RowStoreEncode return sb.toString(); } - public static TupleRange columnStatToRange(SortSpec [] sortSpecs, Schema target, List colStats) { + /** + * if max value is null, set ranges[last] + * @param sortSpecs + * @param sortSchema + * @param colStats + * @param ranges + */ + public static void setMaxRangeIfNull(SortSpec[] sortSpecs, Schema sortSchema, + List colStats, TupleRange[] ranges) { + Map statMap = Maps.newHashMap(); + for (ColumnStats stat : colStats) { + statMap.put(stat.getColumn(), stat); + } + + int i = 0; + for (Column col : sortSchema.getColumns()) { + ColumnStats columnStat = statMap.get(col); + if (columnStat == null) { + continue; + } + if (columnStat.isMaxValueNull()) { + int rangeIndex = sortSpecs[i].isAscending() ? ranges.length - 1 : 0; + VTuple rangeTuple = sortSpecs[i].isAscending() ? (VTuple) ranges[rangeIndex].getEnd() : (VTuple) ranges[rangeIndex].getStart(); + if (LOG.isDebugEnabled()) { + LOG.debug("Set null into range: " + col.getQualifiedName() + ", previous tuple is " + rangeTuple); + } + rangeTuple.put(i, NullDatum.get()); + LOG.info("Set null into range: " + col.getQualifiedName() + ", current tuple is " + rangeTuple); + } + i++; + } + } + + public static TupleRange columnStatToRange(SortSpec [] sortSpecs, Schema target, List colStats, + boolean checkNull) { Map statSet = Maps.newHashMap(); for (ColumnStats stat : colStats) { @@ -98,16 +135,29 @@ public static TupleRange columnStatToRange(SortSpec [] sortSpecs, Schema target, else startTuple.put(i, DatumFactory.createNullDatum()); - if (statSet.get(col).getMaxValue() != null) - endTuple.put(i, statSet.get(col).getMaxValue()); - else - endTuple.put(i, DatumFactory.createNullDatum()); + if (checkNull) { + if (statSet.get(col).isMaxValueNull() || statSet.get(col).getMaxValue() == null) + endTuple.put(i, DatumFactory.createNullDatum()); + else + endTuple.put(i, statSet.get(col).getMaxValue()); + } else { + if (statSet.get(col).getMaxValue() != null) + endTuple.put(i, statSet.get(col).getMaxValue()); + else + endTuple.put(i, DatumFactory.createNullDatum()); + } } else { - if (statSet.get(col).getMaxValue() != null) - startTuple.put(i, statSet.get(col).getMaxValue()); - else - startTuple.put(i, DatumFactory.createNullDatum()); - + if (checkNull) { + if (statSet.get(col).isMaxValueNull() || statSet.get(col).getMaxValue() == null) + startTuple.put(i, DatumFactory.createNullDatum()); + else + startTuple.put(i, statSet.get(col).getMaxValue()); + } else { + if (statSet.get(col).getMaxValue() != null) + startTuple.put(i, statSet.get(col).getMaxValue()); + else + startTuple.put(i, DatumFactory.createNullDatum()); + } if (statSet.get(col).getMinValue() != null) endTuple.put(i, statSet.get(col).getMinValue()); else diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java index fc80bd1464..6031b76d0b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java @@ -564,7 +564,7 @@ public static void scheduleRangeShuffledFetches(TaskSchedulerContext schedulerCo if (totalStat.getNumBytes() == 0 && totalStat.getColumnStats().size() == 0 ) { return; } - TupleRange mergedRange = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats()); + TupleRange mergedRange = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats(), false); RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs); BigDecimal card = partitioner.getTotalCardinality(); @@ -572,16 +572,29 @@ public static void scheduleRangeShuffledFetches(TaskSchedulerContext schedulerCo // we set the the number of tasks to the number of range cardinality. int determinedTaskNum; if (card.compareTo(new BigDecimal(maxNum)) < 0) { - LOG.info("The range cardinality (" + card + LOG.info(subQuery.getId() + ", The range cardinality (" + card + ") is less then the desired number of tasks (" + maxNum + ")"); determinedTaskNum = card.intValue(); } else { determinedTaskNum = maxNum; } - LOG.info("Try to divide " + mergedRange + " into " + determinedTaskNum + + // for LOG + TupleRange mergedRangeForPrint = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats(), true); + LOG.info(subQuery.getId() + ", Try to divide " + mergedRangeForPrint + " into " + determinedTaskNum + " sub ranges (total units: " + determinedTaskNum + ")"); TupleRange [] ranges = partitioner.partition(determinedTaskNum); + if (ranges == null || ranges.length == 0) { + LOG.warn(subQuery.getId() + " no range infos."); + } + TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges); + if (LOG.isDebugEnabled()) { + if (ranges != null) { + for (TupleRange eachRange : ranges) { + LOG.debug(subQuery.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd()); + } + } + } FileFragment dummyFragment = new FileFragment(scan.getTableName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST}); SubQuery.scheduleFragment(subQuery, dummyFragment); diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index a960f692dc..37571575c5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -343,10 +343,10 @@ private TaskCompletionReport getTaskCompletionReport() { builder.setResultStats(new TableStats().getProto()); } - Iterator> it = context.getShuffleFileOutputs(); + Iterator> it = context.getShuffleFileOutputs(); if (it.hasNext()) { do { - Entry entry = it.next(); + Entry entry = it.next(); ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder(); part.setPartId(entry.getKey()); builder.addShuffleFileOutputs(part.build()); diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java index 0b1831cac3..62cfbda33a 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java @@ -21,13 +21,21 @@ import org.apache.tajo.IntegrationTest; import org.apache.tajo.QueryTestCaseBase; import org.apache.tajo.TajoConstants; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.util.KeyValueSet; import org.junit.Test; import org.junit.experimental.categories.Category; import java.sql.ResultSet; import java.util.TimeZone; +import static org.junit.Assert.assertEquals; + @Category(IntegrationTest.class) public class TestSortQuery extends QueryTestCaseBase { @@ -169,4 +177,46 @@ public final void testTopkWithJson() throws Exception { assertResultSet(res); cleanupQuery(res); } + + @Test + public final void testSortNullColumn() throws Exception { + try { + executeString("DROP TABLE table1 PURGE;").close(); + testingCluster.setAllTajoDaemonConfValue(ConfVars.TESTCASE_MIN_TASK_NUM.varname, "2"); + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.put(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.put(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("name", Type.TEXT); + String[] data = new String[]{ + "1|BRAZIL", + "2|ALGERIA", + "3|ARGENTINA", + "4|CANADA" + }; + TajoTestingCluster.createTable("table1", schema, tableOptions, data, 2); + + ResultSet res = executeString( + "select * from (" + + "select case when id > 2 then null else id end as col1, name as col2 from table1) a " + + "order by col1, col2" + ); + + String expected = "col1,col2\n" + + "-------------------------------\n" + + "1,BRAZIL\n" + + "2,ALGERIA\n" + + "null,ARGENTINA\n" + + "null,CANADA\n"; + + assertEquals(expected, resultSetToString(res)); + + cleanupQuery(res); + } finally { + testingCluster.setAllTajoDaemonConfValue(ConfVars.TESTCASE_MIN_TASK_NUM.varname, "0"); + executeString("DROP TABLE table1 PURGE;").close(); + } + } } diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java index 33b2ff36e5..5140a6374b 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java @@ -181,6 +181,7 @@ private RowStoreEncoder(Schema schema) { for (int i = 0; i < schema.size(); i++) { if (tuple.isNull(i)) { nullFlags.set(i); + continue; } col = schema.getColumn(i); diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java index ac9bd8a89d..8801d06697 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java @@ -40,6 +40,7 @@ public class TableStatistics { private long numRows = 0; private long numBytes = 0; + private boolean [] maxValueNulls; private boolean [] comparable; @@ -49,6 +50,7 @@ public TableStatistics(Schema schema) { maxValues = new VTuple(schema.size()); numNulls = new long[schema.size()]; + maxValueNulls = new boolean[schema.size()]; comparable = new boolean[schema.size()]; DataType type; @@ -85,6 +87,7 @@ public long getNumBytes() { public void analyzeField(int idx, Datum datum) { if (datum instanceof NullDatum) { numNulls[idx]++; + maxValueNulls[idx] = true; return; } @@ -113,12 +116,13 @@ public TableStats getTableStat() { LOG.warn("Wrong statistics column type (" + minValues.get(i).type() + ", expected=" + schema.getColumn(i).getDataType().getType() + ")"); } - if (minValues.get(i) == null || schema.getColumn(i).getDataType().getType() == minValues.get(i).type()) { + if (maxValues.get(i) == null || schema.getColumn(i).getDataType().getType() == maxValues.get(i).type()) { columnStats.setMaxValue(maxValues.get(i)); } else { - LOG.warn("Wrong statistics column type (" + minValues.get(i).type() + + LOG.warn("Wrong statistics column type (" + maxValues.get(i).type() + ", expected=" + schema.getColumn(i).getDataType().getType() + ")"); } + columnStats.setMaxValueNull(maxValueNulls[i]); stat.addColumnStat(columnStats); } diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java index cc3cb2eeba..3261177522 100644 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java +++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java @@ -583,7 +583,7 @@ public FileChunk getFileCunks(Path outDir, if (comparator.compare(end, idxReader.getFirstKey()) < 0 || comparator.compare(idxReader.getLastKey(), start) < 0) { - LOG.info("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + + LOG.warn("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + "], but request start:" + start + ", end: " + end); return null; }