Skip to content

Commit

Permalink
[CARBONDATA-3048] Added Lazy Loading For 2.2/2.1
Browse files Browse the repository at this point in the history
Problem:
Currently in 2.2/2.1 For Direct fill Lazy loading is not added because of this when data is huge and number of columns are high query is taking more time Lazy to execute.

Solution
Add Lazy loading for 2.2 and 2.1

Fixed Local Dictionary test case failure when it is enabled

This closes #2846
  • Loading branch information
kumarvishal09 authored and ravipesala committed Oct 27, 2018
1 parent 170c2f5 commit 58ba45e
Show file tree
Hide file tree
Showing 17 changed files with 499 additions and 386 deletions.
Expand Up @@ -17,11 +17,15 @@

package org.apache.carbondata.core.datastore.chunk.store.impl;

import java.util.BitSet;

import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.chunk.store.DimensionDataChunkStore;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertableVector;
import org.apache.carbondata.core.util.CarbonUtil;

/**
Expand Down Expand Up @@ -61,16 +65,24 @@ public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] d
vector.setDictionary(dictionary);
dictionary.setDictionaryUsed();
}
BitSet nullBitset = new BitSet();
CarbonColumnVector dictionaryVector = ColumnarVectorWrapperDirectFactory
.getDirectVectorWrapperFactory(vector.getDictionaryVector(), invertedIndex, nullBitset,
vectorInfo.deletedRows, false, true);
vector = ColumnarVectorWrapperDirectFactory
.getDirectVectorWrapperFactory(vector, invertedIndex, nullBitset, vectorInfo.deletedRows,
false, false);
for (int i = 0; i < rowsNum; i++) {
int surrogate = CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize);
if (surrogate == CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY) {
vector.putNull(i);
vector.getDictionaryVector().putNull(i);
dictionaryVector.putNull(i);
} else {
vector.putNotNull(i);
vector.getDictionaryVector().putInt(i, surrogate);
dictionaryVector.putInt(i, surrogate);
}

}
if (dictionaryVector instanceof ConvertableVector) {
((ConvertableVector) dictionaryVector).convert();
}
}

Expand Down
Expand Up @@ -17,8 +17,6 @@

package org.apache.carbondata.core.datastore.chunk.store.impl.safe;

import java.nio.ByteBuffer;

import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
Expand All @@ -32,43 +30,38 @@
@InterfaceStability.Stable
public abstract class AbstractNonDictionaryVectorFiller {

protected int lengthSize;
protected int numberOfRows;

public AbstractNonDictionaryVectorFiller(int lengthSize, int numberOfRows) {
this.lengthSize = lengthSize;
public AbstractNonDictionaryVectorFiller(int numberOfRows) {
this.numberOfRows = numberOfRows;
}

public abstract void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer);
public abstract void fillVector(byte[] data, CarbonColumnVector vector);

public int getLengthFromBuffer(ByteBuffer buffer) {
return buffer.getShort();
}
}

class NonDictionaryVectorFillerFactory {

public static AbstractNonDictionaryVectorFiller getVectorFiller(DataType type, int lengthSize,
public static AbstractNonDictionaryVectorFiller getVectorFiller(int length, DataType type,
int numberOfRows) {
if (type == DataTypes.STRING) {
if (lengthSize > DataTypes.SHORT.getSizeInBytes()) {
return new LongStringVectorFiller(lengthSize, numberOfRows);
if (length > DataTypes.SHORT.getSizeInBytes()) {
return new LongStringVectorFiller(numberOfRows);
} else {
return new StringVectorFiller(lengthSize, numberOfRows);
return new StringVectorFiller(numberOfRows);
}
} else if (type == DataTypes.VARCHAR) {
return new LongStringVectorFiller(lengthSize, numberOfRows);
return new LongStringVectorFiller(numberOfRows);
} else if (type == DataTypes.TIMESTAMP) {
return new TimeStampVectorFiller(lengthSize, numberOfRows);
return new TimeStampVectorFiller(numberOfRows);
} else if (type == DataTypes.BOOLEAN) {
return new BooleanVectorFiller(lengthSize, numberOfRows);
return new BooleanVectorFiller(numberOfRows);
} else if (type == DataTypes.SHORT) {
return new ShortVectorFiller(lengthSize, numberOfRows);
return new ShortVectorFiller(numberOfRows);
} else if (type == DataTypes.INT) {
return new IntVectorFiller(lengthSize, numberOfRows);
return new IntVectorFiller(numberOfRows);
} else if (type == DataTypes.LONG) {
return new LongVectorFiller(lengthSize, numberOfRows);
return new LongVectorFiller(numberOfRows);
} else {
throw new UnsupportedOperationException("Not supported datatype : " + type);
}
Expand All @@ -79,208 +72,168 @@ public static AbstractNonDictionaryVectorFiller getVectorFiller(DataType type, i

class StringVectorFiller extends AbstractNonDictionaryVectorFiller {

public StringVectorFiller(int lengthSize, int numberOfRows) {
super(lengthSize, numberOfRows);
public StringVectorFiller(int numberOfRows) {
super(numberOfRows);
}

@Override
public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
public void fillVector(byte[] data, CarbonColumnVector vector) {
// start position will be used to store the current data position
int startOffset = 0;
// as first position will be start from length of bytes as data is stored first in the memory
// block we need to skip first two bytes this is because first two bytes will be length of the
// data which we have to skip
int currentOffset = lengthSize;
int localOffset = 0;
ByteUtil.UnsafeComparer comparator = ByteUtil.UnsafeComparer.INSTANCE;
for (int i = 0; i < numberOfRows - 1; i++) {
buffer.position(startOffset);
startOffset += getLengthFromBuffer(buffer) + lengthSize;
int length = startOffset - (currentOffset);
for (int i = 0; i < numberOfRows; i++) {
int length = (((data[localOffset] & 0xFF) << 8) | (data[localOffset + 1] & 0xFF));
localOffset += 2;
if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, currentOffset, length)) {
CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, localOffset, length)) {
vector.putNull(i);
} else {
vector.putByteArray(i, currentOffset, length, data);
vector.putByteArray(i, localOffset, length, data);
}
currentOffset = startOffset + lengthSize;
}
// Handle last row
int length = (data.length - currentOffset);
if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, currentOffset, length)) {
vector.putNull(numberOfRows - 1);
} else {
vector.putByteArray(numberOfRows - 1, currentOffset, length, data);
localOffset += length;
}
}
}

class LongStringVectorFiller extends StringVectorFiller {
public LongStringVectorFiller(int lengthSize, int numberOfRows) {
super(lengthSize, numberOfRows);
class LongStringVectorFiller extends AbstractNonDictionaryVectorFiller {
public LongStringVectorFiller(int numberOfRows) {
super(numberOfRows);
}

@Override
public int getLengthFromBuffer(ByteBuffer buffer) {
return buffer.getInt();
public void fillVector(byte[] data, CarbonColumnVector vector) {
// start position will be used to store the current data position
int localOffset = 0;
ByteUtil.UnsafeComparer comparator = ByteUtil.UnsafeComparer.INSTANCE;
for (int i = 0; i < numberOfRows; i++) {
int length =
(((data[localOffset] & 0xFF) << 24) | ((data[localOffset + 1] & 0xFF) << 16) | (
(data[localOffset + 2] & 0xFF) << 8) | (data[localOffset + 3] & 0xFF));
localOffset += 4;
if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, localOffset, length)) {
vector.putNull(i);
} else {
vector.putByteArray(i, localOffset, length, data);
}
localOffset += length;
}
}
}

class BooleanVectorFiller extends AbstractNonDictionaryVectorFiller {

public BooleanVectorFiller(int lengthSize, int numberOfRows) {
super(lengthSize, numberOfRows);
public BooleanVectorFiller(int numberOfRows) {
super(numberOfRows);
}

@Override
public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
public void fillVector(byte[] data, CarbonColumnVector vector) {
// start position will be used to store the current data position
int startOffset = 0;
int currentOffset = lengthSize;
for (int i = 0; i < numberOfRows - 1; i++) {
buffer.position(startOffset);
startOffset += getLengthFromBuffer(buffer) + lengthSize;
int length = startOffset - (currentOffset);
int localOffset = 0;
for (int i = 0; i < numberOfRows; i++) {
int length = (((data[localOffset] & 0xFF) << 8) | (data[localOffset + 1] & 0xFF));
localOffset += 2;
if (length == 0) {
vector.putNull(i);
} else {
vector.putBoolean(i, ByteUtil.toBoolean(data[currentOffset]));
vector.putBoolean(i, ByteUtil.toBoolean(data[localOffset]));
}
currentOffset = startOffset + lengthSize;
}
int length = (data.length - currentOffset);
if (length == 0) {
vector.putNull(numberOfRows - 1);
} else {
vector.putBoolean(numberOfRows - 1, ByteUtil.toBoolean(data[currentOffset]));
localOffset += length;
}
}
}

class ShortVectorFiller extends AbstractNonDictionaryVectorFiller {

public ShortVectorFiller(int lengthSize, int numberOfRows) {
super(lengthSize, numberOfRows);
public ShortVectorFiller(int numberOfRows) {
super(numberOfRows);
}

@Override
public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
public void fillVector(byte[] data, CarbonColumnVector vector) {
// start position will be used to store the current data position
int startOffset = 0;
int currentOffset = lengthSize;
for (int i = 0; i < numberOfRows - 1; i++) {
buffer.position(startOffset);
startOffset += getLengthFromBuffer(buffer) + lengthSize;
int length = startOffset - (currentOffset);
int localOffset = 0;
for (int i = 0; i < numberOfRows; i++) {
int length = (((data[localOffset] & 0xFF) << 8) | (data[localOffset + 1] & 0xFF));
localOffset += 2;
if (length == 0) {
vector.putNull(i);
} else {
vector.putShort(i, ByteUtil.toXorShort(data, currentOffset, length));
vector.putShort(i, ByteUtil.toXorShort(data, localOffset, length));
}
currentOffset = startOffset + lengthSize;
}
int length = (data.length - currentOffset);
if (length == 0) {
vector.putNull(numberOfRows - 1);
} else {
vector.putShort(numberOfRows - 1, ByteUtil.toXorShort(data, currentOffset, length));
localOffset += length;
}
}
}

class IntVectorFiller extends AbstractNonDictionaryVectorFiller {

public IntVectorFiller(int lengthSize, int numberOfRows) {
super(lengthSize, numberOfRows);
public IntVectorFiller(int numberOfRows) {
super(numberOfRows);
}

@Override
public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
public void fillVector(byte[] data, CarbonColumnVector vector) {
// start position will be used to store the current data position
int startOffset = 0;
int currentOffset = lengthSize;
for (int i = 0; i < numberOfRows - 1; i++) {
buffer.position(startOffset);
startOffset += getLengthFromBuffer(buffer) + lengthSize;
int length = startOffset - (currentOffset);
int localOffset = 0;
for (int i = 0; i < numberOfRows; i++) {
int length = (((data[localOffset] & 0xFF) << 8) | (data[localOffset + 1] & 0xFF));
localOffset += 2;
if (length == 0) {
vector.putNull(i);
} else {
vector.putInt(i, ByteUtil.toXorInt(data, currentOffset, length));
vector.putInt(i, ByteUtil.toXorInt(data, localOffset, length));
}
currentOffset = startOffset + lengthSize;
}
int length = (data.length - currentOffset);
if (length == 0) {
vector.putNull(numberOfRows - 1);
} else {
vector.putInt(numberOfRows - 1, ByteUtil.toXorInt(data, currentOffset, length));
localOffset += length;
}
}
}

class LongVectorFiller extends AbstractNonDictionaryVectorFiller {

public LongVectorFiller(int lengthSize, int numberOfRows) {
super(lengthSize, numberOfRows);
public LongVectorFiller(int numberOfRows) {
super(numberOfRows);
}

@Override
public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
public void fillVector(byte[] data, CarbonColumnVector vector) {
// start position will be used to store the current data position
int startOffset = 0;
int currentOffset = lengthSize;
for (int i = 0; i < numberOfRows - 1; i++) {
buffer.position(startOffset);
startOffset += getLengthFromBuffer(buffer) + lengthSize;
int length = startOffset - (currentOffset);
int localOffset = 0;
for (int i = 0; i < numberOfRows; i++) {
int length = (((data[localOffset] & 0xFF) << 8) | (data[localOffset + 1] & 0xFF));
localOffset += 2;
if (length == 0) {
vector.putNull(i);
} else {
vector.putLong(i, DataTypeUtil
.getDataBasedOnRestructuredDataType(data, vector.getBlockDataType(), currentOffset,
.getDataBasedOnRestructuredDataType(data, vector.getBlockDataType(), localOffset,
length));
}
currentOffset = startOffset + lengthSize;
}
int length = (data.length - currentOffset);
if (length == 0) {
vector.putNull(numberOfRows - 1);
} else {
vector.putLong(numberOfRows - 1, DataTypeUtil
.getDataBasedOnRestructuredDataType(data, vector.getBlockDataType(), currentOffset,
length));
localOffset += length;
}
}
}

class TimeStampVectorFiller extends AbstractNonDictionaryVectorFiller {

public TimeStampVectorFiller(int lengthSize, int numberOfRows) {
super(lengthSize, numberOfRows);
public TimeStampVectorFiller(int numberOfRows) {
super(numberOfRows);
}

@Override
public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
public void fillVector(byte[] data, CarbonColumnVector vector) {
// start position will be used to store the current data position
int startOffset = 0;
int currentOffset = lengthSize;
for (int i = 0; i < numberOfRows - 1; i++) {
buffer.position(startOffset);
startOffset += getLengthFromBuffer(buffer) + lengthSize;
int length = startOffset - (currentOffset);
int localOffset = 0;
for (int i = 0; i < numberOfRows; i++) {
int length = (((data[localOffset] & 0xFF) << 8) | (data[localOffset + 1] & 0xFF));
localOffset += 2;
if (length == 0) {
vector.putNull(i);
} else {
vector.putLong(i, ByteUtil.toXorLong(data, currentOffset, length) * 1000L);
vector.putLong(i, ByteUtil.toXorLong(data, localOffset, length) * 1000L);
}
currentOffset = startOffset + lengthSize;
}
int length = (data.length - currentOffset);
if (length == 0) {
vector.putNull(numberOfRows - 1);
} else {
vector.putLong(numberOfRows - 1, ByteUtil.toXorLong(data, currentOffset, length) * 1000L);
localOffset += length;
}
}
}

0 comments on commit 58ba45e

Please sign in to comment.