Skip to content

Commit

Permalink
[HOTFIX]Fixed data map loading issue when number of segments are high
Browse files Browse the repository at this point in the history
Problem:
When number of segments are high then sometimes data map loading is throwing NPE

Solution:
If two segments having same schema once first one is loaded and second loading is in progress first one tries to clear segment properties cache and clearing the min max
Now added a check if min max is not present then get the min max again as after loading each segment will clear the min max cache so there will not be any leak

This closes #3169
  • Loading branch information
kumarvishal09 authored and ravipesala committed Mar 29, 2019
1 parent 38c81e2 commit 0b93ba3
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,20 +235,20 @@ public void invalidate(String segmentId, int segmentPropertiesIndex,
segmentPropWrapperToSegmentSetMap.get(segmentPropertiesWrapper);
synchronized (getOrCreateTableLock(segmentPropertiesWrapper.getTableIdentifier())) {
segmentIdAndSegmentPropertiesIndexWrapper.removeSegmentId(segmentId);
}
// if after removal of given SegmentId, the segmentIdSet becomes empty that means this
// segmentPropertiesWrapper is not getting used at all. In that case this object can be
// removed from all the holders
if (clearSegmentWrapperFromMap && segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet
.isEmpty()) {
indexToSegmentPropertiesWrapperMapping.remove(segmentPropertiesIndex);
segmentPropWrapperToSegmentSetMap.remove(segmentPropertiesWrapper);
} else if (!clearSegmentWrapperFromMap
&& segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet.isEmpty()) {
// min max columns can very when cache is modified. So even though entry is not required
// to be deleted from map clear the column cache so that it can filled again
segmentPropertiesWrapper.clear();
LOGGER.info("cleared min max for segmentProperties at index: " + segmentPropertiesIndex);
// if after removal of given SegmentId, the segmentIdSet becomes empty that means this
// segmentPropertiesWrapper is not getting used at all. In that case this object can be
// removed from all the holders
if (clearSegmentWrapperFromMap && segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet
.isEmpty()) {
indexToSegmentPropertiesWrapperMapping.remove(segmentPropertiesIndex);
segmentPropWrapperToSegmentSetMap.remove(segmentPropertiesWrapper);
} else if (!clearSegmentWrapperFromMap
&& segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet.isEmpty()) {
// min max columns can very when cache is modified. So even though entry is not required
// to be deleted from map clear the column cache so that it can filled again
segmentPropertiesWrapper.clear();
LOGGER.info("cleared min max for segmentProperties at index: " + segmentPropertiesIndex);
}
}
}
}
Expand Down Expand Up @@ -280,12 +280,13 @@ public static class SegmentPropertiesWrapper {

private static final Object taskSchemaLock = new Object();
private static final Object fileFooterSchemaLock = new Object();
private static final Object minMaxLock = new Object();

private AbsoluteTableIdentifier tableIdentifier;
private List<ColumnSchema> columnsInTable;
private int[] columnCardinality;
private SegmentProperties segmentProperties;
private List<CarbonColumn> minMaxCacheColumns;
private CarbonTable carbonTable;
// in case of hybrid store we can have block as well as blocklet schema
// Scenario: When there is a hybrid store in which few loads are from legacy store which do
// not contain the blocklet information and hence they will be, by default have cache_level as
Expand All @@ -300,7 +301,7 @@ public static class SegmentPropertiesWrapper {

public SegmentPropertiesWrapper(CarbonTable carbonTable,
List<ColumnSchema> columnsInTable, int[] columnCardinality) {
this.tableIdentifier = carbonTable.getAbsoluteTableIdentifier();
this.carbonTable = carbonTable;
this.columnsInTable = columnsInTable;
this.columnCardinality = columnCardinality;
}
Expand All @@ -320,8 +321,9 @@ public void addMinMaxColumns(CarbonTable carbonTable) {
*/
public void clear() {
if (null != minMaxCacheColumns) {
minMaxCacheColumns.clear();
minMaxCacheColumns = null;
}

taskSummarySchemaForBlock = null;
taskSummarySchemaForBlocklet = null;
fileFooterEntrySchemaForBlock = null;
Expand All @@ -334,7 +336,8 @@ public void clear() {
}
SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper other =
(SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper) obj;
return tableIdentifier.equals(other.tableIdentifier) && checkColumnSchemaEquality(
return carbonTable.getAbsoluteTableIdentifier()
.equals(other.carbonTable.getAbsoluteTableIdentifier()) && checkColumnSchemaEquality(
columnsInTable, other.columnsInTable) && Arrays
.equals(columnCardinality, other.columnCardinality);
}
Expand Down Expand Up @@ -372,12 +375,12 @@ private void sortList(List<ColumnSchema> columnSchemas) {
for (ColumnSchema columnSchema: columnsInTable) {
allColumnsHashCode = allColumnsHashCode + columnSchema.strictHashCode();
}
return tableIdentifier.hashCode() + allColumnsHashCode + Arrays
return carbonTable.getAbsoluteTableIdentifier().hashCode() + allColumnsHashCode + Arrays
.hashCode(columnCardinality);
}

public AbsoluteTableIdentifier getTableIdentifier() {
return tableIdentifier;
return carbonTable.getAbsoluteTableIdentifier();
}

public SegmentProperties getSegmentProperties() {
Expand All @@ -398,8 +401,8 @@ public CarbonRowSchema[] getTaskSummarySchemaForBlock(boolean storeBlockletCount
synchronized (taskSchemaLock) {
if (null == taskSummarySchemaForBlock) {
taskSummarySchemaForBlock = SchemaGenerator
.createTaskSummarySchema(segmentProperties, minMaxCacheColumns, storeBlockletCount,
filePathToBeStored);
.createTaskSummarySchema(segmentProperties, getMinMaxCacheColumns(),
storeBlockletCount, filePathToBeStored);
}
}
}
Expand All @@ -412,8 +415,8 @@ public CarbonRowSchema[] getTaskSummarySchemaForBlocklet(boolean storeBlockletCo
synchronized (taskSchemaLock) {
if (null == taskSummarySchemaForBlocklet) {
taskSummarySchemaForBlocklet = SchemaGenerator
.createTaskSummarySchema(segmentProperties, minMaxCacheColumns, storeBlockletCount,
filePathToBeStored);
.createTaskSummarySchema(segmentProperties, getMinMaxCacheColumns(),
storeBlockletCount, filePathToBeStored);
}
}
}
Expand All @@ -425,7 +428,7 @@ public CarbonRowSchema[] getBlockFileFooterEntrySchema() {
synchronized (fileFooterSchemaLock) {
if (null == fileFooterEntrySchemaForBlock) {
fileFooterEntrySchemaForBlock =
SchemaGenerator.createBlockSchema(segmentProperties, minMaxCacheColumns);
SchemaGenerator.createBlockSchema(segmentProperties, getMinMaxCacheColumns());
}
}
}
Expand All @@ -437,14 +440,21 @@ public CarbonRowSchema[] getBlockletFileFooterEntrySchema() {
synchronized (fileFooterSchemaLock) {
if (null == fileFooterEntrySchemaForBlocklet) {
fileFooterEntrySchemaForBlocklet =
SchemaGenerator.createBlockletSchema(segmentProperties, minMaxCacheColumns);
SchemaGenerator.createBlockletSchema(segmentProperties, getMinMaxCacheColumns());
}
}
}
return fileFooterEntrySchemaForBlocklet;
}

public List<CarbonColumn> getMinMaxCacheColumns() {
if (null == minMaxCacheColumns) {
synchronized (minMaxLock) {
if (null == minMaxCacheColumns) {
addMinMaxColumns(carbonTable);
}
}
}
return minMaxCacheColumns;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,9 @@ protected DataMapRowImpl loadToUnsafeBlock(CarbonRowSchema[] schema,
addMinMaxFlagValues(row, schema[ordinal], minMaxFlagValuesForColumnsToBeCached, ordinal);
memoryDMStore.addIndexRow(schema, row);
} catch (Exception e) {
throw new RuntimeException(e);
String message = "Load to unsafe failed for block: " + filePath;
LOGGER.error(message, e);
throw new RuntimeException(message, e);
}
return summaryRow;
}
Expand Down

0 comments on commit 0b93ba3

Please sign in to comment.