Skip to content

Commit

Permalink
Added addLineage in lineage master
Browse files Browse the repository at this point in the history
  • Loading branch information
yupeng9 committed Sep 17, 2015
1 parent 184ce13 commit 7663fc3
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 4 deletions.
Expand Up @@ -113,8 +113,7 @@ public void streamToJournalCheckpoint(JournalOutputStream outputStream) throws I
} }


public long createLineage(List<TachyonFile> inputFiles, List<TachyonFile> outputFiles, Job job) { public long createLineage(List<TachyonFile> inputFiles, List<TachyonFile> outputFiles, Job job) {
// TODO create lineage return mLineageStore.addLineage(inputFiles, outputFiles, job);
return -1;
} }


public boolean deleteLineage(long lineageId) { public boolean deleteLineage(long lineageId) {
Expand Down
Expand Up @@ -65,4 +65,8 @@ public LineageState getState() {
public void setState(LineageState newState) { public void setState(LineageState newState) {
mState = newState; mState = newState;
} }

public long getId() {
return mId;
}
} }
Expand Up @@ -14,12 +14,15 @@
*/ */
package tachyon.master.lineage.meta; package tachyon.master.lineage.meta;


import java.util.List;
import java.util.Map; import java.util.Map;


import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;


import tachyon.client.file.TachyonFile; import tachyon.client.file.TachyonFile;
import tachyon.dag.DAG; import tachyon.dag.DAG;
import tachyon.job.Job;


/** /**
* A store of lineages. * A store of lineages.
Expand All @@ -28,17 +31,42 @@ public final class LineageStore {
private DAG<Lineage> mLineageDAG; private DAG<Lineage> mLineageDAG;


/** Indices for lineages */ /** Indices for lineages */
private Map<TachyonFile, Lineage> mInputFileIndex; /** Index of the output files of lineage to lineage */
private Map<TachyonFile, Lineage> mOutputFileIndex; private Map<TachyonFile, Lineage> mOutputFileIndex;
private Map<LineageState, Lineage> mStateIndex; private Map<LineageState, Lineage> mStateIndex;
private Map<Long, Lineage> mIdIndex;


public LineageStore() { public LineageStore() {
mLineageDAG = new DAG<Lineage>(); mLineageDAG = new DAG<Lineage>();
mInputFileIndex = Maps.newHashMap();
mOutputFileIndex = Maps.newHashMap(); mOutputFileIndex = Maps.newHashMap();
mStateIndex = Maps.newHashMap(); mStateIndex = Maps.newHashMap();
mIdIndex = Maps.newHashMap();
} }


public long addLineage(List<TachyonFile> inputFiles, List<TachyonFile> outputFiles, Job job) {
Lineage lineage = new Lineage(inputFiles, outputFiles, job);


List<Lineage> parentLineages = Lists.newArrayList();
for(TachyonFile inputFile:inputFiles) {
if(mOutputFileIndex.containsKey(inputFile)) {
parentLineages.add(mOutputFileIndex.get(inputFile));
}
}


mLineageDAG.add(lineage, parentLineages);

// update index
for (TachyonFile outputFile : outputFiles) {
mOutputFileIndex.put(outputFile, lineage);
}
mStateIndex.put(lineage.getState(), lineage);
mIdIndex.put(lineage.getId(), lineage);

return lineage.getId();
}

private void updateState(Lineage lineage, LineageState newState) {
lineage.setState(newState);
mStateIndex.put(newState, lineage);
}
} }

0 comments on commit 7663fc3

Please sign in to comment.