Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import com.google.common.base.Objects;
import com.google.gson.annotations.Expose;
import com.google.protobuf.ByteString;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.json.GsonObject;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.json.CatalogGsonHelper;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.ProtoObject;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.json.GsonObject;
import org.apache.tajo.util.TUtil;

public class ColumnStats implements ProtoObject<CatalogProtos.ColumnStatsProto>, Cloneable, GsonObject {
Expand All @@ -41,6 +41,7 @@ public class ColumnStats implements ProtoObject<CatalogProtos.ColumnStatsProto>,
@Expose private Long numNulls = null; // optional
@Expose private Datum minValue = null; // optional
@Expose private Datum maxValue = null; // optional
@Expose private boolean maxValueNull = false; //optional

public ColumnStats(Column column) {
this.column = column;
Expand All @@ -63,6 +64,9 @@ public ColumnStats(CatalogProtos.ColumnStatsProto proto) {
if (proto.hasMaxValue()) {
this.maxValue = DatumFactory.createFromBytes(getColumn().getDataType(), proto.getMaxValue().toByteArray());
}
if (proto.hasMaxValueNull()) {
this.maxValueNull = proto.getMaxValueNull();
}
}

public Column getColumn() {
Expand Down Expand Up @@ -109,6 +113,14 @@ public void setNumNulls(long numNulls) {
this.numNulls = numNulls;
}

public boolean isMaxValueNull() {
return maxValueNull;
}

public void setMaxValueNull(boolean maxValueNull) {
this.maxValueNull = maxValueNull;
}

public boolean equals(Object obj) {
if (obj instanceof ColumnStats) {
ColumnStats other = (ColumnStats) obj;
Expand All @@ -134,6 +146,7 @@ public Object clone() throws CloneNotSupportedException {
stat.numNulls = numNulls;
stat.minValue = minValue;
stat.maxValue = maxValue;
stat.maxValueNull = maxValueNull;

return stat;
}
Expand Down Expand Up @@ -168,6 +181,7 @@ public CatalogProtos.ColumnStatsProto getProto() {
if (this.maxValue != null) {
builder.setMaxValue(ByteString.copyFrom(this.maxValue.asByteArray()));
}
builder.setMaxValueNull(maxValueNull);

return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ public static TableStats aggregateTableStat(List<TableStats> tableStatses) {
css[i].getMaxValue().compareTo(cs.getMaxValue()) < 0)) {
css[i].setMaxValue(ts.getColumnStats().get(i).getMaxValue());
}
if (cs.isMaxValueNull()) {
css[i].setMaxValueNull(cs.isMaxValueNull());
}
} catch (Exception e) {
LOG.warn(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ message ColumnStatsProto {
optional int64 numNulls = 3;
optional bytes minValue = 4;
optional bytes maxValue = 5;
optional bool maxValueNull = 6;

}

enum StatType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public static BigDecimal computeCardinality(DataType dataType, Datum start, Datu
break;
case TEXT:
final char textStart = (start instanceof NullDatum || start.size() == 0) ? '0' : start.asChars().charAt(0);
final char textEnd = (end instanceof NullDatum || start.size() == 0) ? '0' : end.asChars().charAt(0);
final char textEnd = (end instanceof NullDatum || end.size() == 0) ? '0' : end.asChars().charAt(0);
if (isAscending) {
columnCard = new BigDecimal(textEnd - textStart);
} else {
Expand Down
70 changes: 60 additions & 10 deletions tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
Expand All @@ -44,6 +46,7 @@
import java.util.Map;

public class TupleUtil {
private static final Log LOG = LogFactory.getLog(TupleUtil.class);

public static String rangeToQuery(Schema schema, TupleRange range, boolean last)
throws UnsupportedEncodingException {
Expand Down Expand Up @@ -72,7 +75,41 @@ public static String rangeToQuery(TupleRange range, boolean last, RowStoreEncode
return sb.toString();
}

public static TupleRange columnStatToRange(SortSpec [] sortSpecs, Schema target, List<ColumnStats> colStats) {
/**
* if max value is null, set ranges[last]
* @param sortSpecs
* @param sortSchema
* @param colStats
* @param ranges
*/
public static void setMaxRangeIfNull(SortSpec[] sortSpecs, Schema sortSchema,
List<ColumnStats> colStats, TupleRange[] ranges) {
Map<Column, ColumnStats> statMap = Maps.newHashMap();
for (ColumnStats stat : colStats) {
statMap.put(stat.getColumn(), stat);
}

int i = 0;
for (Column col : sortSchema.getColumns()) {
ColumnStats columnStat = statMap.get(col);
if (columnStat == null) {
continue;
}
if (columnStat.isMaxValueNull()) {
int rangeIndex = sortSpecs[i].isAscending() ? ranges.length - 1 : 0;
VTuple rangeTuple = sortSpecs[i].isAscending() ? (VTuple) ranges[rangeIndex].getEnd() : (VTuple) ranges[rangeIndex].getStart();
if (LOG.isDebugEnabled()) {
LOG.debug("Set null into range: " + col.getQualifiedName() + ", previous tuple is " + rangeTuple);
}
rangeTuple.put(i, NullDatum.get());
LOG.info("Set null into range: " + col.getQualifiedName() + ", current tuple is " + rangeTuple);
}
i++;
}
}

public static TupleRange columnStatToRange(SortSpec [] sortSpecs, Schema target, List<ColumnStats> colStats,
boolean checkNull) {

Map<Column, ColumnStats> statSet = Maps.newHashMap();
for (ColumnStats stat : colStats) {
Expand All @@ -98,16 +135,29 @@ public static TupleRange columnStatToRange(SortSpec [] sortSpecs, Schema target,
else
startTuple.put(i, DatumFactory.createNullDatum());

if (statSet.get(col).getMaxValue() != null)
endTuple.put(i, statSet.get(col).getMaxValue());
else
endTuple.put(i, DatumFactory.createNullDatum());
if (checkNull) {
if (statSet.get(col).isMaxValueNull() || statSet.get(col).getMaxValue() == null)
endTuple.put(i, DatumFactory.createNullDatum());
else
endTuple.put(i, statSet.get(col).getMaxValue());
} else {
if (statSet.get(col).getMaxValue() != null)
endTuple.put(i, statSet.get(col).getMaxValue());
else
endTuple.put(i, DatumFactory.createNullDatum());
}
} else {
if (statSet.get(col).getMaxValue() != null)
startTuple.put(i, statSet.get(col).getMaxValue());
else
startTuple.put(i, DatumFactory.createNullDatum());

if (checkNull) {
if (statSet.get(col).isMaxValueNull() || statSet.get(col).getMaxValue() == null)
startTuple.put(i, DatumFactory.createNullDatum());
else
startTuple.put(i, statSet.get(col).getMaxValue());
} else {
if (statSet.get(col).getMaxValue() != null)
startTuple.put(i, statSet.get(col).getMaxValue());
else
startTuple.put(i, DatumFactory.createNullDatum());
}
if (statSet.get(col).getMinValue() != null)
endTuple.put(i, statSet.get(col).getMinValue());
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,24 +569,37 @@ public static void scheduleRangeShuffledFetches(TaskSchedulerContext schedulerCo
if (totalStat.getNumBytes() == 0 && totalStat.getColumnStats().size() == 0 ) {
return;
}
TupleRange mergedRange = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats());
TupleRange mergedRange = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats(), false);
RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs);
BigDecimal card = partitioner.getTotalCardinality();

// if the number of the range cardinality is less than the desired number of tasks,
// we set the the number of tasks to the number of range cardinality.
int determinedTaskNum;
if (card.compareTo(new BigDecimal(maxNum)) < 0) {
LOG.info("The range cardinality (" + card
LOG.info(subQuery.getId() + ", The range cardinality (" + card
+ ") is less then the desired number of tasks (" + maxNum + ")");
determinedTaskNum = card.intValue();
} else {
determinedTaskNum = maxNum;
}

LOG.info("Try to divide " + mergedRange + " into " + determinedTaskNum +
// for LOG
TupleRange mergedRangeForPrint = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats(), true);
LOG.info(subQuery.getId() + ", Try to divide " + mergedRangeForPrint + " into " + determinedTaskNum +
" sub ranges (total units: " + determinedTaskNum + ")");
TupleRange [] ranges = partitioner.partition(determinedTaskNum);
if (ranges == null || ranges.length == 0) {
LOG.warn(subQuery.getId() + " no range infos.");
}
TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges);
if (LOG.isDebugEnabled()) {
if (ranges != null) {
for (TupleRange eachRange : ranges) {
LOG.debug(subQuery.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd());
}
}
}

FileFragment dummyFragment = new FileFragment(scan.getTableName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
SubQuery.scheduleFragment(subQuery, dummyFragment);
Expand Down
4 changes: 2 additions & 2 deletions tajo-core/src/main/java/org/apache/tajo/worker/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -342,10 +342,10 @@ private TaskCompletionReport getTaskCompletionReport() {
builder.setResultStats(new TableStats().getProto());
}

Iterator<Entry<Integer,String>> it = context.getShuffleFileOutputs();
Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs();
if (it.hasNext()) {
do {
Entry<Integer,String> entry = it.next();
Entry<Integer, String> entry = it.next();
ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder();
part.setPartId(entry.getKey());
builder.addShuffleFileOutputs(part.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,21 @@
import org.apache.tajo.IntegrationTest;
import org.apache.tajo.QueryTestCaseBase;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.util.KeyValueSet;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import java.sql.ResultSet;
import java.util.TimeZone;

import static org.junit.Assert.assertEquals;

@Category(IntegrationTest.class)
public class TestSortQuery extends QueryTestCaseBase {

Expand Down Expand Up @@ -169,4 +177,46 @@ public final void testTopkWithJson() throws Exception {
assertResultSet(res);
cleanupQuery(res);
}

@Test
public final void testSortNullColumn() throws Exception {
try {
executeString("DROP TABLE table1 PURGE;").close();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be DROP TABLE IF EXISTS table1 PURGE. Otherwise, it will cause 'no such table error' when a developer executes this test separately.

testingCluster.setAllTajoDaemonConfValue(ConfVars.TESTCASE_MIN_TASK_NUM.varname, "2");
KeyValueSet tableOptions = new KeyValueSet();
tableOptions.put(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
tableOptions.put(StorageConstants.CSVFILE_NULL, "\\\\N");

Schema schema = new Schema();
schema.addColumn("id", Type.INT4);
schema.addColumn("name", Type.TEXT);
String[] data = new String[]{
"1|BRAZIL",
"2|ALGERIA",
"3|ARGENTINA",
"4|CANADA"
};
TajoTestingCluster.createTable("table1", schema, tableOptions, data, 2);

ResultSet res = executeString(
"select * from (" +
"select case when id > 2 then null else id end as col1, name as col2 from table1) a " +
"order by col1, col2"
);

String expected = "col1,col2\n" +
"-------------------------------\n" +
"1,BRAZIL\n" +
"2,ALGERIA\n" +
"null,ARGENTINA\n" +
"null,CANADA\n";

assertEquals(expected, resultSetToString(res));

cleanupQuery(res);
} finally {
testingCluster.setAllTajoDaemonConfValue(ConfVars.TESTCASE_MIN_TASK_NUM.varname, "0");
executeString("DROP TABLE table1 PURGE;").close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ private RowStoreEncoder(Schema schema) {
for (int i = 0; i < schema.size(); i++) {
if (tuple.isNull(i)) {
nullFlags.set(i);
continue;
}

col = schema.getColumn(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class TableStatistics {
private long numRows = 0;
private long numBytes = 0;

private boolean [] maxValueNulls;

private boolean [] comparable;

Expand All @@ -49,6 +50,7 @@ public TableStatistics(Schema schema) {
maxValues = new VTuple(schema.size());

numNulls = new long[schema.size()];
maxValueNulls = new boolean[schema.size()];
comparable = new boolean[schema.size()];

DataType type;
Expand Down Expand Up @@ -85,6 +87,7 @@ public long getNumBytes() {
public void analyzeField(int idx, Datum datum) {
if (datum instanceof NullDatum) {
numNulls[idx]++;
maxValueNulls[idx] = true;
return;
}

Expand Down Expand Up @@ -113,12 +116,13 @@ public TableStats getTableStat() {
LOG.warn("Wrong statistics column type (" + minValues.get(i).type() +
", expected=" + schema.getColumn(i).getDataType().getType() + ")");
}
if (minValues.get(i) == null || schema.getColumn(i).getDataType().getType() == minValues.get(i).type()) {
if (maxValues.get(i) == null || schema.getColumn(i).getDataType().getType() == maxValues.get(i).type()) {
columnStats.setMaxValue(maxValues.get(i));
} else {
LOG.warn("Wrong statistics column type (" + minValues.get(i).type() +
LOG.warn("Wrong statistics column type (" + maxValues.get(i).type() +
", expected=" + schema.getColumn(i).getDataType().getType() + ")");
}
columnStats.setMaxValueNull(maxValueNulls[i]);
stat.addColumnStat(columnStats);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ public FileChunk getFileCunks(Path outDir,

if (comparator.compare(end, idxReader.getFirstKey()) < 0 ||
comparator.compare(idxReader.getLastKey(), start) < 0) {
LOG.info("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() +
LOG.warn("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() +
"], but request start:" + start + ", end: " + end);
return null;
}
Expand Down