Skip to content

Commit

Permalink
Added lineage file recording in lineage master
Browse files Browse the repository at this point in the history
  • Loading branch information
yupeng9 committed Sep 19, 2015
1 parent aa0d537 commit f1cb755
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 16 deletions.
17 changes: 17 additions & 0 deletions servers/src/main/java/tachyon/master/lineage/meta/Lineage.java
Expand Up @@ -73,4 +73,21 @@ public void setState(LineageState newState) {
public long getId() { public long getId() {
return mId; return mId;
} }

public void recordOutputFile(long fileId) {
boolean allRecorded = true;
for (LineageFile outputFile : mOutputFiles) {
if (outputFile.getFileId() == fileId) {
outputFile.setState(LineageFileState.RECORDED);
}

if (outputFile.getState() != LineageFileState.RECORDED) {
allRecorded = false;
}
}

if (allRecorded) {
mState = LineageState.RECORDED;
}
}
} }
Expand Up @@ -16,12 +16,10 @@


import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;


import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets;


import tachyon.client.file.TachyonFile; import tachyon.client.file.TachyonFile;
import tachyon.dag.DAG; import tachyon.dag.DAG;
Expand All @@ -35,14 +33,12 @@ public final class LineageStore {


/** Indices for lineages */ /** Indices for lineages */
/** Index of the output files of lineage to lineage */ /** Index of the output files of lineage to lineage */
private Map<TachyonFile, Lineage> mOutputFileIndex; private Map<Long, Lineage> mOutputFileIndex;
private Map<LineageState, Set<Lineage>> mStateIndex;
private Map<Long, Lineage> mIdIndex; private Map<Long, Lineage> mIdIndex;


public LineageStore() { public LineageStore() {
mLineageDAG = new DAG<Lineage>(); mLineageDAG = new DAG<Lineage>();
mOutputFileIndex = Maps.newHashMap(); mOutputFileIndex = Maps.newHashMap();
mStateIndex = Maps.newHashMap();
mIdIndex = Maps.newHashMap(); mIdIndex = Maps.newHashMap();
} }


Expand All @@ -60,14 +56,19 @@ public long addLineage(List<TachyonFile> inputFiles, List<LineageFile> outputFil


// update index // update index
for (TachyonFile outputFile : outputFiles) { for (TachyonFile outputFile : outputFiles) {
mOutputFileIndex.put(outputFile, lineage); mOutputFileIndex.put(outputFile.getFileId(), lineage);
} }
updateState(lineage, lineage.getState());
mIdIndex.put(lineage.getId(), lineage); mIdIndex.put(lineage.getId(), lineage);


return lineage.getId(); return lineage.getId();
} }


public void recordFileForAsyncWrite(long fileId, String underFsPath) {
Preconditions.checkState(mOutputFileIndex.containsKey(fileId));
Lineage lineage = mOutputFileIndex.get(fileId);
lineage.recordOutputFile(fileId);
}

public void deleteLineage(long lineageId) { public void deleteLineage(long lineageId) {
Preconditions.checkState(mIdIndex.containsKey(lineageId), Preconditions.checkState(mIdIndex.containsKey(lineageId),
"lineage id " + lineageId + " does not exist"); "lineage id " + lineageId + " does not exist");
Expand All @@ -84,7 +85,6 @@ public void deleteLineage(long lineageId) {
for (TachyonFile outputFile : toDelete.getOutputFiles()) { for (TachyonFile outputFile : toDelete.getOutputFiles()) {
mOutputFileIndex.remove(outputFile); mOutputFileIndex.remove(outputFile);
} }
mStateIndex.get(toDelete.getState()).remove(toDelete);
} }


public Lineage getLineage(long lineageId) { public Lineage getLineage(long lineageId) {
Expand All @@ -97,12 +97,4 @@ public List<Lineage> getChildren(Lineage lineage) {


return mLineageDAG.getChildren(lineage); return mLineageDAG.getChildren(lineage);
} }

private void updateState(Lineage lineage, LineageState newState) {
lineage.setState(newState);
if (!mStateIndex.containsKey(newState)) {
mStateIndex.put(newState, Sets.<Lineage>newHashSet());
}
mStateIndex.get(newState).add(lineage);
}
} }

0 comments on commit f1cb755

Please sign in to comment.