Skip to content

Commit

Permalink
Fixed embedded mode caching bug
Browse files Browse the repository at this point in the history
  • Loading branch information
kunal642 committed Jun 27, 2019
1 parent 4ce1886 commit 18842b7
Show file tree
Hide file tree
Showing 13 changed files with 295 additions and 234 deletions.
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
Expand Down Expand Up @@ -84,7 +85,9 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl

private String taskGroupDesc = "";

private String queryId = "";
private String queryId = UUID.randomUUID().toString();

private transient DataMapChooser dataMapChooser;

private boolean isWriteToFile = true;

Expand Down Expand Up @@ -139,26 +142,16 @@ public RecordReader<Void, ExtendedBlocklet> createRecordReader(InputSplit inputS
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
DataMapDistributableWrapper distributable = (DataMapDistributableWrapper) inputSplit;
distributable.getDistributable().getSegment().setCacheable(!isFallbackJob);
distributable.getDistributable().getSegment().setReadCommittedScope(readCommittedScope);
List<Segment> segmentsToLoad = new ArrayList<>();
segmentsToLoad.add(distributable.getDistributable().getSegment());
List<ExtendedBlocklet> blocklets = new ArrayList<>();
DataMapChooser dataMapChooser = null;
if (null != filterResolverIntf) {
dataMapChooser = new DataMapChooser(table);
}
if (dataMapLevel == null) {
TableDataMap defaultDataMap = DataMapStoreManager.getInstance()
.getDataMap(table, distributable.getDistributable().getDataMapSchema());
dataMaps = defaultDataMap.getTableDataMaps(distributable.getDistributable());
if (table.isTransactionalTable()) {
blocklets = defaultDataMap.prune(dataMaps, distributable.getDistributable(),
filterResolverIntf, partitions);
} else {
blocklets = defaultDataMap.prune(segmentsToLoad, new DataMapFilter(filterResolverIntf),
partitions);
}
blocklets = defaultDataMap
.prune(segmentsToLoad, new DataMapFilter(filterResolverIntf), partitions);
blocklets = DataMapUtil
.pruneDataMaps(table, filterResolverIntf, segmentsToLoad, partitions, blocklets,
dataMapChooser);
Expand Down Expand Up @@ -315,6 +308,13 @@ private void initReadCommittedScope() throws IOException {
}
}

/**
* @return Whether the job is fallback or not.
*/
public boolean isFallbackJob() {
return isFallbackJob;
}

/**
* @return Whether the job is to clear cached datamaps or not.
*/
Expand Down Expand Up @@ -374,10 +374,6 @@ public String getQueryId() {
return queryId;
}

public void setQueryId(String queryId) {
this.queryId = queryId;
}

public String getDataMapToClear() {
return dataMapToClear;
}
Expand All @@ -393,4 +389,18 @@ public boolean isWriteToFile() {
public void setFallbackJob() {
isFallbackJob = true;
}

public List<String> getValidSegmentIds() {
List<String> validSegments = new ArrayList<>();
for (Segment segment : this.validSegments) {
validSegments.add(segment.getSegmentNo());
}
return validSegments;
}

public void createDataMapChooser() throws IOException {
if (null != filterResolverIntf) {
this.dataMapChooser = new DataMapChooser(table);
}
}
}
13 changes: 0 additions & 13 deletions core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
Expand Up @@ -69,11 +69,6 @@ public class Segment implements Serializable, Writable {

private long indexSize = 0;

/**
* Whether to cache the segment data maps in executors or not.
*/
private boolean isCacheable = true;

public Segment() {

}
Expand Down Expand Up @@ -287,14 +282,6 @@ public void setIndexSize(long indexSize) {
this.indexSize = indexSize;
}

public boolean isCacheable() {
return isCacheable;
}

public void setCacheable(boolean cacheable) {
isCacheable = cacheable;
}

@Override public void write(DataOutput out) throws IOException {
out.writeUTF(segmentNo);
boolean writeSegmentFileName = segmentFileName != null;
Expand Down
Expand Up @@ -131,7 +131,7 @@ public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> segments
for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) {
tableBlockIndexUniqueIdentifierWrappers.add(
new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
this.getCarbonTable(), segment.getConfiguration(), segment.isCacheable()));
this.getCarbonTable()));
}
}
List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers =
Expand Down

0 comments on commit 18842b7

Please sign in to comment.