Skip to content

Commit

Permalink
Merge 79b697d into eb7a833
Browse files Browse the repository at this point in the history
  • Loading branch information
QiangCai committed May 5, 2019
2 parents eb7a833 + 79b697d commit 579db73
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 49 deletions.
Expand Up @@ -346,15 +346,9 @@ private boolean checkColumnSchemaEquality(List<ColumnSchema> obj1, List<ColumnSc
if (obj1 == null || obj2 == null || (obj1.size() != obj2.size())) {
return false;
}
List<ColumnSchema> clonedObj1 = new ArrayList<>(obj1);
List<ColumnSchema> clonedObj2 = new ArrayList<>(obj2);
clonedObj1.addAll(obj1);
clonedObj2.addAll(obj2);
sortList(clonedObj1);
sortList(clonedObj2);
boolean exists = true;
for (int i = 0; i < obj1.size(); i++) {
if (!clonedObj1.get(i).equalsWithStrictCheck(clonedObj2.get(i))) {
if (!obj1.get(i).equalsWithStrictCheck(obj2.get(i))) {
exists = false;
break;
}
Expand All @@ -372,11 +366,14 @@ private void sortList(List<ColumnSchema> columnSchemas) {

@Override public int hashCode() {
int allColumnsHashCode = 0;
// check column order
StringBuilder builder = new StringBuilder();
for (ColumnSchema columnSchema: columnsInTable) {
allColumnsHashCode = allColumnsHashCode + columnSchema.strictHashCode();
builder.append(columnSchema.getColumnUniqueId()).append(",");
}
return carbonTable.getAbsoluteTableIdentifier().hashCode() + allColumnsHashCode + Arrays
.hashCode(columnCardinality);
.hashCode(columnCardinality) + builder.toString().hashCode();
}

public AbsoluteTableIdentifier getTableIdentifier() {
Expand Down
Expand Up @@ -605,7 +605,7 @@ private BlockExecutionInfo getBlockExecutionInfoForBlock(QueryModel queryModel,
// setting the size of fixed key column (dictionary column)
blockExecutionInfo
.setFixedLengthKeySize(getKeySize(projectDimensions, segmentProperties));
Set<Integer> dictionaryColumnChunkIndex = new HashSet<Integer>();
List<Integer> dictionaryColumnChunkIndex = new ArrayList<Integer>();
List<Integer> noDictionaryColumnChunkIndex = new ArrayList<Integer>();
// get the block index to be read from file for query dimension
// for both dictionary columns and no dictionary columns
Expand All @@ -616,7 +616,9 @@ private BlockExecutionInfo getBlockExecutionInfoForBlock(QueryModel queryModel,
dictionaryColumnChunkIndex.toArray(new Integer[dictionaryColumnChunkIndex.size()]));
// need to sort the dictionary column as for all dimension
// column key will be filled based on key order
Arrays.sort(queryDictionaryColumnChunkIndexes);
if (!queryModel.isForcedDetailRawQuery()) {
Arrays.sort(queryDictionaryColumnChunkIndexes);
}
blockExecutionInfo.setDictionaryColumnChunkIndex(queryDictionaryColumnChunkIndexes);
// setting the no dictionary column block indexes
blockExecutionInfo.setNoDictionaryColumnChunkIndexes(ArrayUtils.toPrimitive(
Expand Down
Expand Up @@ -509,7 +509,7 @@ public static byte[] getMaskedKey(byte[] data, byte[] maxKey, int[] maskByteRang
public static void fillQueryDimensionChunkIndexes(
List<ProjectionDimension> projectDimensions,
Map<Integer, Integer> columnOrdinalToChunkIndexMapping,
Set<Integer> dictionaryDimensionChunkIndex,
List<Integer> dictionaryDimensionChunkIndex,
List<Integer> noDictionaryDimensionChunkIndex) {
for (ProjectionDimension queryDimension : projectDimensions) {
if (CarbonUtil.hasEncoding(queryDimension.getDimension().getEncoder(), Encoding.DICTIONARY)
Expand Down
Expand Up @@ -28,9 +28,15 @@
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
import org.apache.carbondata.core.scan.result.RowBatch;
import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.DataTypeUtil;

import org.apache.log4j.Logger;

Expand Down Expand Up @@ -58,6 +64,15 @@ public class RawResultIterator extends CarbonIterator<Object[]> {
private Object[] currentRawRow = null;
private boolean isBackupFilled = false;

// column reorder for no-dictionary column
private int noDictCount;
private int[] noDictMap;
// column drift
private final boolean hasColumnDrift;
private boolean[] isColumnDrift;
private int measureCount;
private DataType[] measureDataTypes;

/**
* LOGGER
*/
Expand All @@ -66,18 +81,62 @@ public class RawResultIterator extends CarbonIterator<Object[]> {

public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator,
SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties,
boolean isStreamingHandoff) {
boolean isStreamingHandoff, boolean hasColumnDrift) {
this.detailRawQueryResultIterator = detailRawQueryResultIterator;
this.sourceSegProperties = sourceSegProperties;
this.destinationSegProperties = destinationSegProperties;
this.executorService = Executors.newFixedThreadPool(1);

this.hasColumnDrift = hasColumnDrift;
if (!isStreamingHandoff) {
init();
}
}

private void initForColumnDrift() {
List<CarbonDimension> noDictDims =
new ArrayList<>(destinationSegProperties.getDimensions().size());
for (CarbonDimension dimension : destinationSegProperties.getDimensions()) {
if (dimension.getNumberOfChild() == 0) {
if (!dimension.hasEncoding(Encoding.DICTIONARY)) {
noDictDims.add(dimension);
}
}
}
measureCount = destinationSegProperties.getMeasures().size();
noDictCount = noDictDims.size();
isColumnDrift = new boolean[noDictCount];
noDictMap = new int[noDictCount];
measureDataTypes = new DataType[noDictCount];
List<CarbonMeasure> sourceMeasures = sourceSegProperties.getMeasures();
int tableMeasureCount = sourceMeasures.size();
for (int i = 0; i < noDictCount; i++) {
for (int j = 0; j < tableMeasureCount; j++) {
if (RestructureUtil.isColumnMatches(true, noDictDims.get(i), sourceMeasures.get(j))) {
isColumnDrift[i] = true;
measureDataTypes[i] = sourceMeasures.get(j).getDataType();
break;
}
}
if (measureDataTypes[i] == null) {
isColumnDrift[i] = false;
}
}
int noDictIndex = 0;
// the column drift are at the end of measures
int measureIndex = measureCount + 1;
for (int i = 0; i < noDictCount; i++) {
if (isColumnDrift[i]) {
noDictMap[i] = measureIndex++;
} else {
noDictMap[i] = noDictIndex++;
}
}
}

private void init() {
if (hasColumnDrift) {
initForColumnDrift();
}
this.prefetchEnabled = CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true");
Expand Down Expand Up @@ -194,11 +253,34 @@ public Object[] fetchConverted() throws KeyGenException {
}

private Object[] convertRow(Object[] rawRow) throws KeyGenException {
byte[] dims = ((ByteArrayWrapper) rawRow[0]).getDictionaryKey();
ByteArrayWrapper dimObject = (ByteArrayWrapper) rawRow[0];
byte[] dims = dimObject.getDictionaryKey();
long[] keyArray = sourceSegProperties.getDimensionKeyGenerator().getKeyArray(dims);
byte[] covertedBytes =
destinationSegProperties.getDimensionKeyGenerator().generateKey(keyArray);
((ByteArrayWrapper) rawRow[0]).setDictionaryKey(covertedBytes);
dimObject.setDictionaryKey(covertedBytes);
if (hasColumnDrift) {
// need move measure to dimension and return new row by current schema
byte[][] noDicts = dimObject.getNoDictionaryKeys();
byte[][] newNoDicts = new byte[noDictCount][];
for (int i = 0; i < noDictCount; i++) {
if (isColumnDrift[i]) {
newNoDicts[i] = DataTypeUtil
.getBytesDataDataTypeForNoDictionaryColumn(rawRow[noDictMap[i]], measureDataTypes[i]);
} else {
newNoDicts[i] = noDicts[noDictMap[i]];
}
}
ByteArrayWrapper newWrapper = new ByteArrayWrapper();
newWrapper.setDictionaryKey(covertedBytes);
newWrapper.setNoDictionaryKeys(newNoDicts);
newWrapper.setComplexTypesKeys(dimObject.getComplexTypesKeys());
newWrapper.setImplicitColumnByteArray(dimObject.getImplicitColumnByteArray());
Object[] finalRawRow = new Object[1 + measureCount];
finalRawRow[0] = newWrapper;
System.arraycopy(rawRow, 1, finalRawRow, 1, measureCount);
return finalRawRow;
}
return rawRow;
}

Expand Down
Expand Up @@ -218,4 +218,7 @@ public void setImplicitColumnByteArray(byte[] implicitColumnByteArray) {
this.implicitColumnByteArray = implicitColumnByteArray;
}

public byte[] getImplicitColumnByteArray() {
return implicitColumnByteArray;
}
}

0 comments on commit 579db73

Please sign in to comment.