Skip to content

Commit

Permalink
enabling global index for MOR
Browse files Browse the repository at this point in the history
  • Loading branch information
n3nash authored and vinothchandar committed May 16, 2018
1 parent dfc0c61 commit 23d5376
Showing 1 changed file with 13 additions and 6 deletions.
Expand Up @@ -39,6 +39,7 @@
import com.uber.hoodie.exception.HoodieCompactionException;
import com.uber.hoodie.exception.HoodieRollbackException;
import com.uber.hoodie.exception.HoodieUpsertException;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.io.HoodieAppendHandle;
import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor;
import java.io.IOException;
Expand Down Expand Up @@ -154,11 +155,10 @@ public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, List<String> comm
// Atomically un-publish all non-inflight commits
commitsAndCompactions.entrySet().stream().map(entry -> entry.getValue())
.filter(i -> !i.isInflight()).forEach(this.getActiveTimeline()::revertToInflight);

logger.info("Unpublished " + commits);

Long startTime = System.currentTimeMillis();

// TODO (NA) : remove this once HoodieIndex is a member of HoodieTable
HoodieIndex hoodieIndex = HoodieIndex.createIndex(config, jsc);
List<HoodieRollbackStat> allRollbackStats = jsc.parallelize(FSUtils
.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning()))
Expand Down Expand Up @@ -195,17 +195,24 @@ public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, List<String> comm

// append rollback blocks for updates
if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
// This needs to be done since GlobalIndex at the moment does not store the latest commit time
Map<String, String> fileIdToLatestCommitTimeMap =
hoodieIndex.isGlobal() ? this.getRTFileSystemView().getLatestFileSlices(partitionPath)
.collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseCommitTime)) : null;
commitMetadata.getPartitionToWriteStats().get(partitionPath).stream()
.filter(wStat -> {
return wStat != null
&& wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT
return wStat != null && wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT
&& wStat.getPrevCommit() != null;
}).forEach(wStat -> {
HoodieLogFormat.Writer writer = null;
String baseCommitTime = wStat.getPrevCommit();
if (hoodieIndex.isGlobal()) {
baseCommitTime = fileIdToLatestCommitTimeMap.get(wStat.getFileId());
}
try {
writer = HoodieLogFormat.newWriterBuilder().onParentPath(
new Path(this.getMetaClient().getBasePath(), partitionPath))
.withFileId(wStat.getFileId()).overBaseCommit(wStat.getPrevCommit())
.withFileId(wStat.getFileId()).overBaseCommit(baseCommitTime)
.withFs(this.metaClient.getFs())
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
Long numRollbackBlocks = 0L;
Expand Down

0 comments on commit 23d5376

Please sign in to comment.