Skip to content

Commit

Permalink
[CARBONDATA-3062] Fix Compatibility issue with cache_level as blocklet
Browse files Browse the repository at this point in the history
In case of hybrid store we can have block as well as blocklet schema.
Scenario:
When there is a hybrid store in which few loads are from legacy store which do not contain the blocklet information and hence they will be, by
default have cache_level as BLOCK and few loads with latest store which contain the BLOCKLET information and have cache_level BLOCKLET. For these
type of scenarios we need to have separate task and footer schemas. For all loads with/without blocklet info there will not be any additional cost
of maintaining 2 variables

This closes #2883
  • Loading branch information
Indhumathi27 authored and manishgupta88 committed Nov 2, 2018
1 parent 269f4c3 commit 6e58418
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 37 deletions.
Expand Up @@ -284,11 +284,17 @@ public static class SegmentPropertiesWrapper {
private int[] columnCardinality;
private SegmentProperties segmentProperties;
private List<CarbonColumn> minMaxCacheColumns;
private CarbonRowSchema[] taskSummarySchema;
// same variable can be used for block and blocklet schema because at any given cache_level
// with either block or blocklet and whenever cache_level is changed the cache and its
// corresponding segmentProperties is flushed
private CarbonRowSchema[] fileFooterEntrySchema;
// in case of hybrid store we can have block as well as blocklet schema
// Scenario: When there is a hybrid store in which few loads are from legacy store which do
// not contain the blocklet information and hence they will be, by default have cache_level as
// BLOCK and few loads with latest store which contain the BLOCKLET information and have
// cache_level BLOCKLET. For these type of scenarios we need to have separate task and footer
// schemas. For all loads with/without blocklet info there will not be any additional cost
// of maintaining 2 variables
private CarbonRowSchema[] taskSummarySchemaForBlock;
private CarbonRowSchema[] taskSummarySchemaForBlocklet;
private CarbonRowSchema[] fileFooterEntrySchemaForBlock;
private CarbonRowSchema[] fileFooterEntrySchemaForBlocklet;

public SegmentPropertiesWrapper(CarbonTable carbonTable,
List<ColumnSchema> columnsInTable, int[] columnCardinality) {
Expand All @@ -314,8 +320,10 @@ public void clear() {
if (null != minMaxCacheColumns) {
minMaxCacheColumns.clear();
}
taskSummarySchema = null;
fileFooterEntrySchema = null;
taskSummarySchemaForBlock = null;
taskSummarySchemaForBlocklet = null;
fileFooterEntrySchemaForBlock = null;
fileFooterEntrySchemaForBlocklet = null;
}

@Override public boolean equals(Object obj) {
Expand Down Expand Up @@ -350,48 +358,62 @@ public int[] getColumnCardinality() {
return columnCardinality;
}

public CarbonRowSchema[] getTaskSummarySchema(boolean storeBlockletCount,
public CarbonRowSchema[] getTaskSummarySchemaForBlock(boolean storeBlockletCount,
boolean filePathToBeStored) throws MemoryException {
if (null == taskSummarySchema) {
if (null == taskSummarySchemaForBlock) {
synchronized (taskSchemaLock) {
if (null == taskSummarySchema) {
taskSummarySchema = SchemaGenerator
if (null == taskSummarySchemaForBlock) {
taskSummarySchemaForBlock = SchemaGenerator
.createTaskSummarySchema(segmentProperties, minMaxCacheColumns, storeBlockletCount,
filePathToBeStored);
}
}
}
return taskSummarySchema;
return taskSummarySchemaForBlock;
}

public CarbonRowSchema[] getTaskSummarySchemaForBlocklet(boolean storeBlockletCount,
boolean filePathToBeStored) throws MemoryException {
if (null == taskSummarySchemaForBlocklet) {
synchronized (taskSchemaLock) {
if (null == taskSummarySchemaForBlocklet) {
taskSummarySchemaForBlocklet = SchemaGenerator
.createTaskSummarySchema(segmentProperties, minMaxCacheColumns, storeBlockletCount,
filePathToBeStored);
}
}
}
return taskSummarySchemaForBlocklet;
}

public CarbonRowSchema[] getBlockFileFooterEntrySchema() {
return getOrCreateFileFooterEntrySchema(true);
if (null == fileFooterEntrySchemaForBlock) {
synchronized (fileFooterSchemaLock) {
if (null == fileFooterEntrySchemaForBlock) {
fileFooterEntrySchemaForBlock =
SchemaGenerator.createBlockSchema(segmentProperties, minMaxCacheColumns);
}
}
}
return fileFooterEntrySchemaForBlock;
}

public CarbonRowSchema[] getBlockletFileFooterEntrySchema() {
return getOrCreateFileFooterEntrySchema(false);
if (null == fileFooterEntrySchemaForBlocklet) {
synchronized (fileFooterSchemaLock) {
if (null == fileFooterEntrySchemaForBlocklet) {
fileFooterEntrySchemaForBlocklet =
SchemaGenerator.createBlockletSchema(segmentProperties, minMaxCacheColumns);
}
}
}
return fileFooterEntrySchemaForBlocklet;
}

public List<CarbonColumn> getMinMaxCacheColumns() {
return minMaxCacheColumns;
}

private CarbonRowSchema[] getOrCreateFileFooterEntrySchema(boolean isCacheLevelBlock) {
if (null == fileFooterEntrySchema) {
synchronized (fileFooterSchemaLock) {
if (null == fileFooterEntrySchema) {
if (isCacheLevelBlock) {
fileFooterEntrySchema =
SchemaGenerator.createBlockSchema(segmentProperties, minMaxCacheColumns);
} else {
fileFooterEntrySchema =
SchemaGenerator.createBlockletSchema(segmentProperties, minMaxCacheColumns);
}
}
}
}
return fileFooterEntrySchema;
}
}

/**
Expand Down
Expand Up @@ -1006,7 +1006,7 @@ protected CarbonRowSchema[] getTaskSummarySchema() {
SegmentPropertiesAndSchemaHolder.getInstance()
.getSegmentPropertiesWrapper(segmentPropertiesIndex);
try {
return segmentPropertiesWrapper.getTaskSummarySchema(true, isFilePathStored);
return segmentPropertiesWrapper.getTaskSummarySchemaForBlock(true, isFilePathStored);
} catch (MemoryException e) {
throw new RuntimeException(e);
}
Expand Down
Expand Up @@ -87,7 +87,7 @@ protected CarbonRowSchema[] getTaskSummarySchema() {
SegmentPropertiesAndSchemaHolder.getInstance()
.getSegmentPropertiesWrapper(segmentPropertiesIndex);
try {
return segmentPropertiesWrapper.getTaskSummarySchema(false, isFilePathStored);
return segmentPropertiesWrapper.getTaskSummarySchemaForBlocklet(false, isFilePathStored);
} catch (MemoryException e) {
throw new RuntimeException(e);
}
Expand Down
Expand Up @@ -92,7 +92,7 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
expectedLength: Int, storeBlockletCount: Boolean = false): Boolean = {
val index = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex
val summarySchema = SegmentPropertiesAndSchemaHolder.getInstance()
.getSegmentPropertiesWrapper(index).getTaskSummarySchema(storeBlockletCount, false)
.getSegmentPropertiesWrapper(index).getTaskSummarySchemaForBlock(storeBlockletCount, false)
val minSchemas = summarySchema(0).asInstanceOf[CarbonRowSchema.StructCarbonRowSchema]
.getChildSchemas
minSchemas.length == expectedLength
Expand Down
Expand Up @@ -30,9 +30,7 @@
import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
Expand Down Expand Up @@ -74,7 +72,6 @@ public RowResultMergerProcessor(String databaseName,
this.loadModel = loadModel;
CarbonDataProcessorUtil.createLocations(tempStoreLocation);

CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
String carbonStoreLocation;
if (partitionSpec != null) {
carbonStoreLocation =
Expand All @@ -86,7 +83,8 @@ public RowResultMergerProcessor(String databaseName,
loadModel.getSegmentId());
}
CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
.getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName,
.getCarbonFactDataHandlerModel(loadModel,
loadModel.getCarbonDataLoadSchema().getCarbonTable(), segProp, tableName,
tempStoreLocation, carbonStoreLocation);
setDataFileAttributesInModel(loadModel, compactionType, carbonFactDataHandlerModel);
carbonFactDataHandlerModel.setCompactionFlow(true);
Expand Down

0 comments on commit 6e58418

Please sign in to comment.