Skip to content

Commit

Permalink
segregate block and blocklet cache
Browse files Browse the repository at this point in the history
  • Loading branch information
manishgupta88 committed Jul 3, 2018
1 parent dac5d3c commit 504b212
Show file tree
Hide file tree
Showing 14 changed files with 919 additions and 810 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapModel;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.util.BlockletDataMapUtil;

/**
Expand Down Expand Up @@ -89,7 +90,8 @@ public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper id
.getBlockMetaInfoMap(identifierWrapper, indexFileStore, filesRead,
carbonDataFileBlockMetaInfoMapping);
BlockletDataMap blockletDataMap =
loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap);
loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap,
identifierWrapper.getCarbonTable());
dataMaps.add(blockletDataMap);
blockletDataMapIndexWrapper = new BlockletDataMapIndexWrapper(dataMaps);
} else {
Expand All @@ -103,7 +105,8 @@ public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper id
identifierWrapper.getCarbonTable()), indexFileStore, filesRead,
carbonDataFileBlockMetaInfoMapping);
BlockletDataMap blockletDataMap =
loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap);
loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap,
identifierWrapper.getCarbonTable());
dataMaps.add(blockletDataMap);
}
blockletDataMapIndexWrapper = new BlockletDataMapIndexWrapper(dataMaps);
Expand Down Expand Up @@ -230,10 +233,9 @@ public void put(TableBlockIndexUniqueIdentifierWrapper tableBlockIndexUniqueIden
* @return map of taks id to segment mapping
* @throws IOException
*/
private BlockletDataMap loadAndGetDataMap(
TableBlockIndexUniqueIdentifier identifier,
SegmentIndexFileStore indexFileStore,
Map<String, BlockMetaInfo> blockMetaInfoMap)
private BlockletDataMap loadAndGetDataMap(TableBlockIndexUniqueIdentifier identifier,
SegmentIndexFileStore indexFileStore, Map<String, BlockMetaInfo> blockMetaInfoMap,
CarbonTable carbonTable)
throws IOException, MemoryException {
String uniqueTableSegmentIdentifier =
identifier.getUniqueTableSegmentIdentifier();
Expand All @@ -243,11 +245,12 @@ private BlockletDataMap loadAndGetDataMap(
}
BlockletDataMap dataMap;
synchronized (lock) {
boolean isCacheLevelBlock = BlockletDataMapUtil.isCacheLevelBlock(carbonTable);
dataMap = new BlockletDataMap();
dataMap.init(new BlockletDataMapModel(
identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
.getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()),
blockMetaInfoMap, identifier.getSegmentId()));
blockMetaInfoMap, identifier.getSegmentId(), isCacheLevelBlock));
}
return dataMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ public class BlockletDetailInfo implements Serializable, Writable {
private byte[] columnSchemaBinary;

private long blockSize;
/**
* flag to check for store from 1.1 or any prior version
*/
private boolean isLegacyStore;

public int getRowCount() {
return rowCount;
Expand Down Expand Up @@ -172,6 +176,7 @@ public void setBlockSize(long blockSize) {
out.writeInt(blockletInfoBinary.length);
out.write(blockletInfoBinary);
out.writeLong(blockSize);
out.writeBoolean(isLegacyStore);
}

@Override public void readFields(DataInput in) throws IOException {
Expand All @@ -198,6 +203,7 @@ public void setBlockSize(long blockSize) {
in.readFully(blockletInfoBinary);
setBlockletInfoFromBinary();
blockSize = in.readLong();
isLegacyStore = in.readBoolean();
}

/**
Expand Down Expand Up @@ -225,6 +231,7 @@ public BlockletDetailInfo copy() {
detailInfo.blockFooterOffset = blockFooterOffset;
detailInfo.columnSchemas = columnSchemas;
detailInfo.blockSize = blockSize;
detailInfo.isLegacyStore = isLegacyStore;
return detailInfo;
}

Expand Down Expand Up @@ -263,4 +270,11 @@ public void setBlockletInfoBinary(byte[] blockletInfoBinary) {
this.blockletInfoBinary = blockletInfoBinary;
}

public boolean isLegacyStore() {
return isLegacyStore;
}

public void setLegacyStore(boolean legacyStore) {
isLegacyStore = legacyStore;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public SafeMemoryDMStore(CarbonRowSchema[] schema) {
@Override
public void addIndexRow(DataMapRow indexRow) throws MemoryException {
dataMapRows.add(indexRow);
runningLength += indexRow.getTotalSizeInBytes();
runningLength += indexRow.getTotalSizeInBytes(schema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,96 +89,96 @@ private void increaseMemory() throws MemoryException {
*/
public void addIndexRow(DataMapRow indexRow) throws MemoryException {
// First calculate the required memory to keep the row in unsafe
int rowSize = indexRow.getTotalSizeInBytes();
int rowSize = indexRow.getTotalSizeInBytes(schema);
// Check whether allocated memory is sufficient or not.
ensureSize(rowSize);
int pointer = runningLength;

for (int i = 0; i < schema.length; i++) {
addToUnsafe(schema[i], indexRow, i);
}
addToUnsafe(schema, indexRow);
pointers[rowCount++] = pointer;
}

private void addToUnsafe(CarbonRowSchema schema, DataMapRow row, int index) {
switch (schema.getSchemaType()) {
case FIXED:
DataType dataType = schema.getDataType();
if (dataType == DataTypes.BYTE) {
getUnsafe()
.putByte(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
row.getByte(index));
runningLength += row.getSizeInBytes(index);
} else if (dataType == DataTypes.SHORT) {
private void addToUnsafe(CarbonRowSchema[] carbonRowSchemas, DataMapRow row) {
for (int index = 0; index < carbonRowSchemas.length; index++) {
CarbonRowSchema rowSchema = carbonRowSchemas[index];
switch (rowSchema.getSchemaType()) {
case FIXED:
DataType dataType = rowSchema.getDataType();
if (dataType == DataTypes.BYTE) {
getUnsafe()
.putByte(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
row.getByte(carbonRowSchemas, index));
runningLength += row.getSizeInBytes(carbonRowSchemas, index);
} else if (dataType == DataTypes.SHORT) {
getUnsafe()
.putShort(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
row.getShort(carbonRowSchemas, index));
runningLength += row.getSizeInBytes(carbonRowSchemas, index);
} else if (dataType == DataTypes.INT) {
getUnsafe()
.putInt(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
row.getInt(carbonRowSchemas, index));
runningLength += row.getSizeInBytes(carbonRowSchemas, index);
} else if (dataType == DataTypes.LONG) {
getUnsafe()
.putLong(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
row.getLong(carbonRowSchemas, index));
runningLength += row.getSizeInBytes(carbonRowSchemas, index);
} else if (dataType == DataTypes.FLOAT) {
getUnsafe()
.putFloat(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
row.getFloat(carbonRowSchemas, index));
runningLength += row.getSizeInBytes(carbonRowSchemas, index);
} else if (dataType == DataTypes.DOUBLE) {
getUnsafe()
.putDouble(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
row.getDouble(carbonRowSchemas, index));
runningLength += row.getSizeInBytes(carbonRowSchemas, index);
} else if (dataType == DataTypes.BYTE_ARRAY) {
byte[] data = row.getByteArray(carbonRowSchemas, index);
getUnsafe().copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
memoryBlock.getBaseOffset() + runningLength, data.length);
runningLength += row.getSizeInBytes(carbonRowSchemas, index);
} else {
throw new UnsupportedOperationException(
"unsupported data type for unsafe storage: " + rowSchema.getDataType());
}
break;
case VARIABLE_SHORT:
byte[] data = row.getByteArray(carbonRowSchemas, index);
getUnsafe()
.putShort(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
row.getShort(index));
runningLength += row.getSizeInBytes(index);
} else if (dataType == DataTypes.INT) {
getUnsafe()
.putInt(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
row.getInt(index));
runningLength += row.getSizeInBytes(index);
} else if (dataType == DataTypes.LONG) {
getUnsafe()
.putLong(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
row.getLong(index));
runningLength += row.getSizeInBytes(index);
} else if (dataType == DataTypes.FLOAT) {
getUnsafe()
.putFloat(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
row.getFloat(index));
runningLength += row.getSizeInBytes(index);
} else if (dataType == DataTypes.DOUBLE) {
getUnsafe()
.putDouble(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
row.getDouble(index));
runningLength += row.getSizeInBytes(index);
} else if (dataType == DataTypes.BYTE_ARRAY) {
byte[] data = row.getByteArray(index);
(short) data.length);
runningLength += 2;
getUnsafe().copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
memoryBlock.getBaseOffset() + runningLength, data.length);
runningLength += row.getSizeInBytes(index);
} else {
runningLength += data.length;
break;
case VARIABLE_INT:
byte[] data2 = row.getByteArray(carbonRowSchemas, index);
getUnsafe()
.putInt(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
data2.length);
runningLength += 4;
getUnsafe().copyMemory(data2, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
memoryBlock.getBaseOffset() + runningLength, data2.length);
runningLength += data2.length;
break;
case STRUCT:
DataMapRow struct = row.getRow(carbonRowSchemas, index);
CarbonRowSchema[] childSchemas =
((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchemas[index]).getChildSchemas();
addToUnsafe(childSchemas, struct);
break;
default:
throw new UnsupportedOperationException(
"unsupported data type for unsafe storage: " + schema.getDataType());
}
break;
case VARIABLE_SHORT:
byte[] data = row.getByteArray(index);
getUnsafe().putShort(memoryBlock.getBaseObject(),
memoryBlock.getBaseOffset() + runningLength, (short) data.length);
runningLength += 2;
getUnsafe().copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
memoryBlock.getBaseOffset() + runningLength, data.length);
runningLength += data.length;
break;
case VARIABLE_INT:
byte[] data2 = row.getByteArray(index);
getUnsafe().putInt(memoryBlock.getBaseObject(),
memoryBlock.getBaseOffset() + runningLength, data2.length);
runningLength += 4;
getUnsafe().copyMemory(data2, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
memoryBlock.getBaseOffset() + runningLength, data2.length);
runningLength += data2.length;
break;
case STRUCT:
CarbonRowSchema[] childSchemas =
((CarbonRowSchema.StructCarbonRowSchema) schema).getChildSchemas();
DataMapRow struct = row.getRow(index);
for (int i = 0; i < childSchemas.length; i++) {
addToUnsafe(childSchemas[i], struct, i);
}
break;
default:
throw new UnsupportedOperationException(
"unsupported data type for unsafe storage: " + schema.getDataType());
"unsupported data type for unsafe storage: " + rowSchema.getDataType());
}
}
}

public DataMapRow getDataMapRow(int index) {
assert (index < rowCount);
return new UnsafeDataMapRow(schema, memoryBlock, pointers[index]);
return new UnsafeDataMapRow(memoryBlock, pointers[index]);
}

public void finishWriting() throws MemoryException {
Expand Down
Loading

0 comments on commit 504b212

Please sign in to comment.