Skip to content

Commit

Permalink
[CARBONDATA-3157] Added lazy load and direct vector fill support to P…
Browse files Browse the repository at this point in the history
…resto

To improve the scan performance, integrate lazy loading and direct fil vector features to Carbon Presto Integration.

This PR also fixes the query fail in case of multiple table join and filters

This closes #2978
  • Loading branch information
ravipesala authored and jackylk committed Dec 12, 2018
1 parent 1f4614a commit 6026cb5
Show file tree
Hide file tree
Showing 24 changed files with 482 additions and 37 deletions.
Expand Up @@ -24,6 +24,7 @@
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectWithInvertedIndex;
import org.apache.carbondata.core.scan.result.vector.impl.directread.SequentialFill;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.DataTypeUtil;

Expand Down Expand Up @@ -83,20 +84,39 @@ public StringVectorFiller(int numberOfRows, int actualDataLength) {
@Override
public void fillVector(byte[] data, CarbonColumnVector vector) {
// start position will be used to store the current data position
boolean addSequential = vector instanceof ColumnarVectorWrapperDirectWithInvertedIndex
|| vector instanceof SequentialFill;

int localOffset = 0;
ByteUtil.UnsafeComparer comparator = ByteUtil.UnsafeComparer.INSTANCE;
for (int i = 0; i < numberOfRows; i++) {
int length = (((data[localOffset] & 0xFF) << 8) | (data[localOffset + 1] & 0xFF));
localOffset += 2;
if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, localOffset, length)) {
vector.putNull(i);
} else {
vector.putArray(i, localOffset, length);
// In case of inverted index and sequential fill, add data to vector sequentially instead of
// adding offsets and data separately.
if (addSequential) {
for (int i = 0; i < numberOfRows; i++) {
int length = (((data[localOffset] & 0xFF) << 8) | (data[localOffset + 1] & 0xFF));
localOffset += 2;
if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, localOffset, length)) {
vector.putNull(i);
} else {
vector.putByteArray(i, localOffset, length, data);
}
localOffset += length;
}
localOffset += length;
} else {
for (int i = 0; i < numberOfRows; i++) {
int length = (((data[localOffset] & 0xFF) << 8) | (data[localOffset + 1] & 0xFF));
localOffset += 2;
if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, localOffset, length)) {
vector.putNull(i);
} else {
vector.putArray(i, localOffset, length);
}
localOffset += length;
}
vector.putAllByteArray(data, 0, actualDataLength);
}
vector.putAllByteArray(data, 0, actualDataLength);
}
}

Expand All @@ -111,7 +131,8 @@ public LongStringVectorFiller(int numberOfRows, int actualDataLength) {

@Override public void fillVector(byte[] data, CarbonColumnVector vector) {
// start position will be used to store the current data position
boolean invertedIndex = vector instanceof ColumnarVectorWrapperDirectWithInvertedIndex;
boolean invertedIndex = vector instanceof ColumnarVectorWrapperDirectWithInvertedIndex
|| vector instanceof SequentialFill;
int localOffset = 0;
ByteUtil.UnsafeComparer comparator = ByteUtil.UnsafeComparer.INSTANCE;
if (invertedIndex) {
Expand Down
Expand Up @@ -40,6 +40,7 @@
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
import org.apache.carbondata.core.scan.result.vector.impl.directread.SequentialFill;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.Encoding;
Expand Down Expand Up @@ -316,7 +317,8 @@ public void decodeAndFillVector(byte[] pageData, ColumnVectorInfo vectorInfo, Bi
}
}

if (deletedRows == null || deletedRows.isEmpty()) {
if ((deletedRows == null || deletedRows.isEmpty())
&& !(vectorInfo.vector instanceof SequentialFill)) {
for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
vector.putNull(i);
}
Expand Down
Expand Up @@ -43,6 +43,7 @@
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertableVector;
import org.apache.carbondata.core.scan.result.vector.impl.directread.SequentialFill;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.Encoding;
Expand Down Expand Up @@ -318,7 +319,8 @@ public void decodeAndFillVector(byte[] pageData, ColumnVectorInfo vectorInfo, Bi
.getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows,
true, false);
fillVector(pageData, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
if (deletedRows == null || deletedRows.isEmpty()) {
if ((deletedRows == null || deletedRows.isEmpty())
&& !(vectorInfo.vector instanceof SequentialFill)) {
for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
vector.putNull(i);
}
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
import org.apache.carbondata.core.scan.result.vector.impl.directread.SequentialFill;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.Encoding;
Expand Down Expand Up @@ -302,7 +303,8 @@ public void decodeAndFillVector(byte[] pageData, ColumnVectorInfo vectorInfo, Bi
}
}

if (deletedRows == null || deletedRows.isEmpty()) {
if ((deletedRows == null || deletedRows.isEmpty())
&& !(vectorInfo.vector instanceof SequentialFill)) {
for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
vector.putNull(i);
}
Expand Down
Expand Up @@ -42,6 +42,7 @@
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertableVector;
import org.apache.carbondata.core.scan.result.vector.impl.directread.SequentialFill;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.Encoding;
Expand Down Expand Up @@ -291,7 +292,8 @@ public void decodeAndFillVector(byte[] pageData, ColumnVectorInfo vectorInfo, Bi
.getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows,
true, false);
fillVector(pageData, vector, vectorDataType, pageDataType, pageSize, vectorInfo, nullBits);
if (deletedRows == null || deletedRows.isEmpty()) {
if ((deletedRows == null || deletedRows.isEmpty())
&& !(vectorInfo.vector instanceof SequentialFill)) {
for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
vector.putNull(i);
}
Expand Down
Expand Up @@ -43,6 +43,7 @@
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertableVector;
import org.apache.carbondata.core.scan.result.vector.impl.directread.SequentialFill;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.format.Encoding;

Expand Down Expand Up @@ -239,7 +240,8 @@ public void decodeAndFillVector(byte[] pageData, ColumnVectorInfo vectorInfo, Bi
.getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows,
true, false);
fillVector(pageData, vector, vectorDataType, pageDataType, pageSize, vectorInfo, nullBits);
if (deletedRows == null || deletedRows.isEmpty()) {
if ((deletedRows == null || deletedRows.isEmpty())
&& !(vectorInfo.vector instanceof SequentialFill)) {
for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
vector.putNull(i);
}
Expand Down
Expand Up @@ -204,7 +204,8 @@ private void fillNoDictionaryData(CarbonColumnVector vector, ColumnVectorInfo co
} else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
vector.putLongs(columnVectorInfo.vectorOffset, columnVectorInfo.size, (long) defaultValue);
} else {
vector.putBytes(columnVectorInfo.vectorOffset, columnVectorInfo.size, (byte[])defaultValue);
vector.putByteArray(columnVectorInfo.vectorOffset, columnVectorInfo.size,
(byte[]) defaultValue);
}
} else {
vector.putNulls(columnVectorInfo.vectorOffset, columnVectorInfo.size);
Expand Down
Expand Up @@ -68,7 +68,7 @@ public interface CarbonColumnVector {

void putByte(int rowId, byte value);

void putBytes(int rowId, int count, byte[] value);
void putByteArray(int rowId, int count, byte[] value);

void putBytes(int rowId, int count, byte[] src, int srcIndex);

Expand Down
Expand Up @@ -47,7 +47,7 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {

private short[] shorts;

private BitSet nullBytes;
protected BitSet nullBytes;

private DataType dataType;

Expand All @@ -69,6 +69,10 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {

private CarbonColumnVector dictionaryVector;

private LazyPageLoader lazyPage;

private boolean loaded;

public CarbonColumnVectorImpl(int batchSize, DataType dataType) {
this.batchSize = batchSize;
nullBytes = new BitSet(batchSize);
Expand Down Expand Up @@ -163,7 +167,7 @@ public CarbonColumnVectorImpl(int batchSize, DataType dataType) {
byteArr[rowId] = value;
}

@Override public void putBytes(int rowId, int count, byte[] value) {
@Override public void putByteArray(int rowId, int count, byte[] value) {
for (int i = 0; i < count; ++i) {
bytes[i + rowId] = value;
}
Expand Down Expand Up @@ -208,6 +212,9 @@ public boolean isNullAt(int rowId) {
}

@Override public Object getData(int rowId) {
if (!loaded) {
loadPage();
}
if (nullBytes.get(rowId)) {
return null;
}
Expand Down Expand Up @@ -243,6 +250,9 @@ public boolean isNullAt(int rowId) {
}

public Object getDataArray() {
if (!loaded) {
loadPage();
}
if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
return byteArr;
} else if (dataType == DataTypes.SHORT) {
Expand Down Expand Up @@ -291,6 +301,7 @@ public Object getDataArray() {
} else {
Arrays.fill(data, null);
}
loaded = false;

}

Expand Down Expand Up @@ -367,7 +378,14 @@ public void setBlockDataType(DataType blockDataType) {
}

@Override public void setLazyPage(LazyPageLoader lazyPage) {
lazyPage.loadPage();
this.lazyPage = lazyPage;
}

public void loadPage() {
if (lazyPage != null) {
lazyPage.loadPage();
}
loaded = true;
}

@Override public void putArray(int rowId, int offset, int length) {
Expand Down
Expand Up @@ -59,7 +59,7 @@ public void putDoubles(int rowId, int count, double value) {
}

@Override
public void putBytes(int rowId, int count, byte[] value) {
public void putByteArray(int rowId, int count, byte[] value) {
throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
}

Expand Down
Expand Up @@ -41,15 +41,25 @@ public final class ColumnarVectorWrapperDirectFactory {
public static CarbonColumnVector getDirectVectorWrapperFactory(CarbonColumnVector columnVector,
int[] invertedIndex, BitSet nullBitset, BitSet deletedRows, boolean isnullBitsExists,
boolean isDictVector) {
if ((invertedIndex != null && invertedIndex.length > 0) && (deletedRows == null || deletedRows
.isEmpty())) {
// If it is sequential data filler then add the null bitset.
if (columnVector instanceof SequentialFill) {
// If it has inverted index then create a dummy delete rows bitset so that it goes to
// ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex, here it does the sequential
// filling using another vector.
if ((invertedIndex != null && invertedIndex.length > 0)) {
if (deletedRows == null) {
deletedRows = new BitSet();
}
} else if (deletedRows == null) {
((SequentialFill) columnVector).setNullBits(nullBitset);
}
}
if ((invertedIndex != null && invertedIndex.length > 0) && (deletedRows == null)) {
return new ColumnarVectorWrapperDirectWithInvertedIndex(columnVector, invertedIndex,
isnullBitsExists);
} else if ((invertedIndex == null || invertedIndex.length == 0) && (deletedRows != null
&& !deletedRows.isEmpty())) {
} else if ((invertedIndex == null || invertedIndex.length == 0) && deletedRows != null) {
return new ColumnarVectorWrapperDirectWithDeleteDelta(columnVector, deletedRows, nullBitset);
} else if ((invertedIndex != null && invertedIndex.length > 0) && (deletedRows != null
&& !deletedRows.isEmpty())) {
} else if ((invertedIndex != null && invertedIndex.length > 0) && deletedRows != null) {
return new ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex(columnVector,
deletedRows, invertedIndex, nullBitset, isnullBitsExists, isDictVector);
} else {
Expand Down
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.core.scan.result.vector.impl.directread;

import java.util.BitSet;

import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;

/**
* It is sort of a marker interface to let execution engine know that it is appendable/sequential
* data adding vector. It means we cannot add random rowids to it.
*/
@InterfaceStability.Evolving
@InterfaceAudience.Internal
public interface SequentialFill {

/**
* Set the null bitset
* @param nullBits
*/
void setNullBits(BitSet nullBits);
}
Expand Up @@ -157,7 +157,7 @@ public CarbonColumnVectorWrapper(CarbonColumnVectorImpl columnVector, boolean[]
}
}

@Override public void putBytes(int rowId, int count, byte[] value) {
@Override public void putByteArray(int rowId, int count, byte[] value) {
for (int i = 0; i < count; i++) {
if (!filteredRows[rowId]) {
columnVector.putByteArray(counter++, value);
Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.util.Set;

import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.datatype.DecimalType;
Expand Down Expand Up @@ -68,8 +69,13 @@ private CarbonVectorBatch(StructField[] schema, CarbonDictionaryDecodeReadSuppor
}

public static CarbonVectorBatch allocate(StructField[] schema,
CarbonDictionaryDecodeReadSupport readSupport) {
return new CarbonVectorBatch(schema, readSupport, DEFAULT_BATCH_SIZE);
CarbonDictionaryDecodeReadSupport readSupport, boolean isDirectFill) {
if (isDirectFill) {
return new CarbonVectorBatch(schema, readSupport,
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
} else {
return new CarbonVectorBatch(schema, readSupport,DEFAULT_BATCH_SIZE);
}
}

private CarbonColumnVectorImpl createDirectStreamReader(int batchSize, DataType dataType,
Expand Down
Expand Up @@ -161,6 +161,7 @@ private final class CarbondataBlockLoader implements LazyBlockLoader<LazyBlock>
}
checkState(batchId == expectedBatchId);
try {
vectorReader.getColumnarBatch().column(columnIndex).loadPage();
PrestoVectorBlockBuilder blockBuilder =
(PrestoVectorBlockBuilder) vectorReader.getColumnarBatch().column(columnIndex);
blockBuilder.setBatchSize(lazyBlock.getPositionCount());
Expand Down
Expand Up @@ -100,6 +100,11 @@ private PrestoCarbonVectorizedRecordReader createReader(ConnectorSplit split,
checkArgument(carbondataSplit.getConnectorId().equals(connectorId),
"split is not for this connector");
QueryModel queryModel = createQueryModel(carbondataSplit, columns);
if (carbonTableReader.config.getPushRowFilter() == null ||
carbonTableReader.config.getPushRowFilter().equalsIgnoreCase("false")) {
queryModel.setDirectVectorFill(true);
queryModel.setPreFetchData(false);
}
QueryExecutor queryExecutor =
QueryExecutorFactory.getQueryExecutor(queryModel, new Configuration());
try {
Expand Down

0 comments on commit 6026cb5

Please sign in to comment.