Skip to content

Commit

Permalink
Upgrade to Presto 0.208 for better stability and memory management
Browse files Browse the repository at this point in the history
  • Loading branch information
bhavya411 committed Sep 11, 2018
1 parent 2ccdbb7 commit 0ccdee8
Show file tree
Hide file tree
Showing 13 changed files with 76 additions and 63 deletions.
2 changes: 1 addition & 1 deletion integration/presto/pom.xml
Expand Up @@ -31,7 +31,7 @@
<packaging>presto-plugin</packaging>

<properties>
<presto.version>0.187</presto.version>
<presto.version>0.208</presto.version>
<dev.path>${basedir}/../../dev</dev.path>
<jacoco.append>true</jacoco.append>
</properties>
Expand Down
Expand Up @@ -36,7 +36,7 @@
import org.apache.carbondata.presto.readers.SliceStreamReader;
import org.apache.carbondata.presto.readers.TimestampStreamReader;

import com.facebook.presto.spi.block.SliceArrayBlock;
import com.facebook.presto.spi.block.Block;

public class CarbonVectorBatch {

Expand All @@ -63,7 +63,7 @@ private CarbonVectorBatch(StructField[] schema, CarbonDictionaryDecodeReadSuppor

for (int i = 0; i < schema.length; ++i) {
columns[i] = createDirectStreamReader(maxRows, dataTypes[i], schema[i], dictionaries[i],
readSupport.getSliceArrayBlock(i));
readSupport.getDictionaryBlock(i));
}
}

Expand All @@ -73,7 +73,7 @@ public static CarbonVectorBatch allocate(StructField[] schema,
}

private CarbonColumnVectorImpl createDirectStreamReader(int batchSize, DataType dataType,
StructField field, Dictionary dictionary, SliceArrayBlock dictionarySliceArrayBlock) {
StructField field, Dictionary dictionary, Block dictionaryBlock) {
if (dataType == DataTypes.BOOLEAN) {
return new BooleanStreamReader(batchSize, field.getDataType(), dictionary);
} else if (dataType == DataTypes.SHORT) {
Expand All @@ -87,7 +87,7 @@ private CarbonColumnVectorImpl createDirectStreamReader(int batchSize, DataType
} else if (dataType == DataTypes.DOUBLE) {
return new DoubleStreamReader(batchSize, field.getDataType(), dictionary);
} else if (dataType == DataTypes.STRING) {
return new SliceStreamReader(batchSize, field.getDataType(), dictionarySliceArrayBlock);
return new SliceStreamReader(batchSize, field.getDataType(), dictionaryBlock);
} else if (DataTypes.isDecimal(dataType)) {
return new DecimalSliceStreamReader(batchSize, (DecimalType) field.getDataType(), dictionary);
} else {
Expand Down
Expand Up @@ -64,7 +64,8 @@ public CarbondataSplitManager(CarbondataConnectorId connectorId, CarbonTableRead
}

public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle,
ConnectorSession session, ConnectorTableLayoutHandle layout) {
ConnectorSession session, ConnectorTableLayoutHandle layout,
SplitSchedulingStrategy splitSchedulingStrategy) {
CarbondataTableLayoutHandle layoutHandle = (CarbondataTableLayoutHandle) layout;
CarbondataTableHandle tableHandle = layoutHandle.getTable();
SchemaTableName key = tableHandle.getSchemaTableName();
Expand Down
Expand Up @@ -25,7 +25,6 @@

import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.block.BlockBuilderStatus;
import com.facebook.presto.spi.type.BooleanType;
import com.facebook.presto.spi.type.Type;

Expand All @@ -43,7 +42,7 @@ public class BooleanStreamReader extends CarbonColumnVectorImpl
public BooleanStreamReader(int batchSize, DataType dataType, Dictionary dictionary) {
super(batchSize, dataType);
this.batchSize = batchSize;
this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
this.builder = type.createBlockBuilder(null, batchSize);
this.dictionary = dictionary;
}

Expand Down Expand Up @@ -74,7 +73,7 @@ public BooleanStreamReader(int batchSize, DataType dataType, Dictionary dictiona
}

@Override public void reset() {
builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
builder = type.createBlockBuilder(null, batchSize);
}

}
Expand Up @@ -30,7 +30,6 @@

import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.block.BlockBuilderStatus;
import com.facebook.presto.spi.type.DecimalType;
import com.facebook.presto.spi.type.Decimals;
import com.facebook.presto.spi.type.Type;
Expand All @@ -44,6 +43,8 @@
import static io.airlift.slice.Slices.utf8Slice;




/**
* Reader for DecimalValues
*/
Expand All @@ -61,7 +62,7 @@ public DecimalSliceStreamReader(int batchSize,
super(batchSize, dataType);
this.type = DecimalType.createDecimalType(dataType.getPrecision(), dataType.getScale());
this.batchSize = batchSize;
this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
this.builder = type.createBlockBuilder(null, batchSize);
this.dictionary = dictionary;
}

Expand Down Expand Up @@ -93,7 +94,7 @@ public DecimalSliceStreamReader(int batchSize,
}

@Override public void reset() {
builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
builder = type.createBlockBuilder(null, batchSize);
}

private void decimalBlockWriter(BigDecimal value) {
Expand Down
Expand Up @@ -25,7 +25,6 @@

import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.block.BlockBuilderStatus;
import com.facebook.presto.spi.type.DoubleType;
import com.facebook.presto.spi.type.Type;

Expand All @@ -45,7 +44,7 @@ public class DoubleStreamReader extends CarbonColumnVectorImpl implements Presto
public DoubleStreamReader(int batchSize, DataType dataType, Dictionary dictionary) {
super(batchSize, dataType);
this.batchSize = batchSize;
this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
this.builder = type.createBlockBuilder(null, batchSize);
this.dictionary = dictionary;
}

Expand Down Expand Up @@ -76,6 +75,6 @@ public DoubleStreamReader(int batchSize, DataType dataType, Dictionary dictionar
}

@Override public void reset() {
builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
builder = type.createBlockBuilder(null, batchSize);
}
}
Expand Up @@ -25,7 +25,6 @@

import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.block.BlockBuilderStatus;
import com.facebook.presto.spi.type.IntegerType;
import com.facebook.presto.spi.type.Type;

Expand All @@ -43,7 +42,7 @@ public class IntegerStreamReader extends CarbonColumnVectorImpl
public IntegerStreamReader(int batchSize, DataType dataType, Dictionary dictionary) {
super(batchSize, dataType);
this.batchSize = batchSize;
this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
this.builder = type.createBlockBuilder(null, batchSize);
this.dictionary = dictionary;
}

Expand Down Expand Up @@ -74,7 +73,7 @@ public IntegerStreamReader(int batchSize, DataType dataType, Dictionary dictiona
}

@Override public void reset() {
builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
builder = type.createBlockBuilder(null, batchSize);
}

}
Expand Up @@ -25,7 +25,6 @@

import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.block.BlockBuilderStatus;
import com.facebook.presto.spi.type.BigintType;
import com.facebook.presto.spi.type.Type;

Expand All @@ -42,7 +41,7 @@ public class LongStreamReader extends CarbonColumnVectorImpl implements PrestoVe
public LongStreamReader(int batchSize, DataType dataType, Dictionary dictionary) {
super(batchSize, dataType);
this.batchSize = batchSize;
this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
this.builder = type.createBlockBuilder(null, batchSize);
this.dictionary = dictionary;
}

Expand Down Expand Up @@ -73,6 +72,6 @@ public LongStreamReader(int batchSize, DataType dataType, Dictionary dictionary)
}

@Override public void reset() {
builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
builder = type.createBlockBuilder(null, batchSize);
}
}
Expand Up @@ -22,7 +22,6 @@

import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.block.BlockBuilderStatus;
import com.facebook.presto.spi.type.IntegerType;
import com.facebook.presto.spi.type.Type;

Expand All @@ -40,7 +39,7 @@ public class ObjectStreamReader extends CarbonColumnVectorImpl implements Presto
public ObjectStreamReader(int batchSize, DataType dataType) {
super(batchSize, dataType);
this.batchSize = batchSize;
this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
this.builder = type.createBlockBuilder(null, batchSize);
}

@Override public Block buildBlock() {
Expand All @@ -60,7 +59,7 @@ public ObjectStreamReader(int batchSize, DataType dataType) {
}

@Override public void reset() {
builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
builder = type.createBlockBuilder(null, batchSize);
}

}
Expand Up @@ -25,7 +25,6 @@

import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.block.BlockBuilderStatus;
import com.facebook.presto.spi.type.SmallintType;
import com.facebook.presto.spi.type.Type;

Expand All @@ -42,7 +41,7 @@ public class ShortStreamReader extends CarbonColumnVectorImpl implements PrestoV
public ShortStreamReader(int batchSize, DataType dataType, Dictionary dictionary) {
super(batchSize, dataType);
this.batchSize = batchSize;
this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
this.builder = type.createBlockBuilder(null, batchSize);
this.dictionary = dictionary;
}

Expand Down Expand Up @@ -73,6 +72,6 @@ public ShortStreamReader(int batchSize, DataType dataType, Dictionary dictionary
}

@Override public void reset() {
builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
builder = type.createBlockBuilder(null, batchSize);
}
}
Expand Up @@ -22,9 +22,7 @@

import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.block.BlockBuilderStatus;
import com.facebook.presto.spi.block.DictionaryBlock;
import com.facebook.presto.spi.block.SliceArrayBlock;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.VarcharType;

Expand All @@ -40,26 +38,28 @@ public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoV
protected Type type = VarcharType.VARCHAR;

protected BlockBuilder builder;

int[] values;
private SliceArrayBlock dictionarySliceArrayBlock;

private Block dictionaryBlock;

public SliceStreamReader(int batchSize, DataType dataType,
SliceArrayBlock dictionarySliceArrayBlock) {
Block dictionaryBlock) {
super(batchSize, dataType);
this.batchSize = batchSize;
if (dictionarySliceArrayBlock == null) {
this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
if (dictionaryBlock == null) {
this.builder = type.createBlockBuilder(null, batchSize);
} else {
this.dictionarySliceArrayBlock = dictionarySliceArrayBlock;
this.dictionaryBlock = dictionaryBlock;
this.values = new int[batchSize];
}
}

@Override public Block buildBlock() {
if (dictionarySliceArrayBlock == null) {
if (dictionaryBlock == null) {
return builder.build();
} else {
return new DictionaryBlock(batchSize, dictionarySliceArrayBlock, values);
return new DictionaryBlock(batchSize, dictionaryBlock, values);
}
}

Expand All @@ -82,12 +82,12 @@ public SliceStreamReader(int batchSize, DataType dataType,
}

@Override public void putNull(int rowId) {
if (dictionarySliceArrayBlock == null) {
if (dictionaryBlock == null) {
builder.appendNull();
}
}

@Override public void reset() {
builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
builder = type.createBlockBuilder(null, batchSize);
}
}
Expand Up @@ -25,7 +25,6 @@

import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.block.BlockBuilderStatus;
import com.facebook.presto.spi.type.TimestampType;
import com.facebook.presto.spi.type.Type;

Expand All @@ -43,7 +42,7 @@ public class TimestampStreamReader extends CarbonColumnVectorImpl
public TimestampStreamReader(int batchSize, DataType dataType, Dictionary dictionary) {
super(batchSize, dataType);
this.batchSize = batchSize;
this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
this.builder = type.createBlockBuilder(null, batchSize);
this.dictionary = dictionary;
}

Expand Down Expand Up @@ -74,6 +73,6 @@ public TimestampStreamReader(int batchSize, DataType dataType, Dictionary dictio
}

@Override public void reset() {
builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize);
builder = type.createBlockBuilder(null, batchSize);
}
}

0 comments on commit 0ccdee8

Please sign in to comment.