Skip to content

Commit

Permalink
Removed lineage file state from lineage store
Browse files Browse the repository at this point in the history
  • Loading branch information
yupeng9 committed Dec 11, 2015
1 parent f31d286 commit 248aac6
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 164 deletions.
10 changes: 4 additions & 6 deletions servers/src/main/java/tachyon/master/lineage/LineageMaster.java
Expand Up @@ -34,7 +34,6 @@


import tachyon.Constants; import tachyon.Constants;
import tachyon.TachyonURI; import tachyon.TachyonURI;
import tachyon.client.file.TachyonFile;
import tachyon.conf.TachyonConf; import tachyon.conf.TachyonConf;
import tachyon.exception.BlockInfoException; import tachyon.exception.BlockInfoException;
import tachyon.exception.ExceptionMessage; import tachyon.exception.ExceptionMessage;
Expand All @@ -59,7 +58,6 @@
import tachyon.master.lineage.checkpoint.CheckpointPlan; import tachyon.master.lineage.checkpoint.CheckpointPlan;
import tachyon.master.lineage.checkpoint.CheckpointSchedulingExcecutor; import tachyon.master.lineage.checkpoint.CheckpointSchedulingExcecutor;
import tachyon.master.lineage.meta.Lineage; import tachyon.master.lineage.meta.Lineage;
import tachyon.master.lineage.meta.LineageFile;
import tachyon.master.lineage.meta.LineageFileState; import tachyon.master.lineage.meta.LineageFileState;
import tachyon.master.lineage.meta.LineageIdGenerator; import tachyon.master.lineage.meta.LineageIdGenerator;
import tachyon.master.lineage.meta.LineageStore; import tachyon.master.lineage.meta.LineageStore;
Expand Down Expand Up @@ -219,18 +217,18 @@ public LineageStoreView getLineageStoreView() {
public synchronized long createLineage(List<TachyonURI> inputFiles, List<TachyonURI> outputFiles, public synchronized long createLineage(List<TachyonURI> inputFiles, List<TachyonURI> outputFiles,
Job job) throws InvalidPathException, FileAlreadyExistsException, BlockInfoException, Job job) throws InvalidPathException, FileAlreadyExistsException, BlockInfoException,
IOException { IOException {
List<TachyonFile> inputTachyonFiles = Lists.newArrayList(); List<Long> inputTachyonFiles = Lists.newArrayList();
for (TachyonURI inputFile : inputFiles) { for (TachyonURI inputFile : inputFiles) {
long fileId; long fileId;
fileId = mFileSystemMaster.getFileId(inputFile); fileId = mFileSystemMaster.getFileId(inputFile);
if (fileId == IdUtils.INVALID_FILE_ID) { if (fileId == IdUtils.INVALID_FILE_ID) {
throw new InvalidPathException( throw new InvalidPathException(
ExceptionMessage.LINEAGE_INPUT_FILE_NOT_EXIST.getMessage(inputFile)); ExceptionMessage.LINEAGE_INPUT_FILE_NOT_EXIST.getMessage(inputFile));
} }
inputTachyonFiles.add(new TachyonFile(fileId)); inputTachyonFiles.add(fileId);
} }
// create output files // create output files
List<LineageFile> outputTachyonFiles = Lists.newArrayList(); List<Long> outputTachyonFiles = Lists.newArrayList();
for (TachyonURI outputFile : outputFiles) { for (TachyonURI outputFile : outputFiles) {
long fileId; long fileId;
// TODO(yupeng): delete the placeholder files if the creation fails. // TODO(yupeng): delete the placeholder files if the creation fails.
Expand All @@ -239,7 +237,7 @@ public synchronized long createLineage(List<TachyonURI> inputFiles, List<Tachyon
new CreateOptions.Builder(MasterContext.getConf()).setRecursive(true) new CreateOptions.Builder(MasterContext.getConf()).setRecursive(true)
.setBlockSizeBytes(Constants.KB).build(); .setBlockSizeBytes(Constants.KB).build();
fileId = mFileSystemMaster.create(outputFile, options); fileId = mFileSystemMaster.create(outputFile, options);
outputTachyonFiles.add(new LineageFile(fileId)); outputTachyonFiles.add(fileId);
} }


LOG.info("Create lineage of input:{}, output:{}, job:{}", inputTachyonFiles, LOG.info("Create lineage of input:{}, output:{}, job:{}", inputTachyonFiles,
Expand Down
78 changes: 0 additions & 78 deletions servers/src/main/java/tachyon/master/lineage/meta/LineageFile.java

This file was deleted.

Expand Up @@ -20,12 +20,10 @@
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;


import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;


import tachyon.client.file.TachyonFile;
import tachyon.collections.DirectedAcyclicGraph; import tachyon.collections.DirectedAcyclicGraph;
import tachyon.exception.ExceptionMessage; import tachyon.exception.ExceptionMessage;
import tachyon.exception.LineageDoesNotExistException; import tachyon.exception.LineageDoesNotExistException;
Expand Down Expand Up @@ -78,8 +76,8 @@ public synchronized void addLineageFromJournal(LineageEntry entry) {
* @param job the job * @param job the job
* @return the id of the created lineage * @return the id of the created lineage
*/ */
public synchronized long createLineage(List<TachyonFile> inputFiles, public synchronized long createLineage(List<Long> inputFiles,
List<LineageFile> outputFiles, Job job) { List<Long> outputFiles, Job job) {
long lineageId = mLineageIdGenerator.generateId(); long lineageId = mLineageIdGenerator.generateId();
Lineage lineage = new Lineage(lineageId, inputFiles, outputFiles, job); Lineage lineage = new Lineage(lineageId, inputFiles, outputFiles, job);
addLineageInternal(lineage); addLineageInternal(lineage);
Expand All @@ -88,31 +86,20 @@ public synchronized long createLineage(List<TachyonFile> inputFiles,


private void addLineageInternal(Lineage lineage) { private void addLineageInternal(Lineage lineage) {
List<Lineage> parentLineages = Lists.newArrayList(); List<Lineage> parentLineages = Lists.newArrayList();
for (TachyonFile inputFile : lineage.getInputFiles()) { for (long inputFile : lineage.getInputFiles()) {
if (mOutputFileIndex.containsKey(inputFile.getFileId())) { if (mOutputFileIndex.containsKey(inputFile)) {
parentLineages.add(mOutputFileIndex.get(inputFile.getFileId())); parentLineages.add(mOutputFileIndex.get(inputFile));
} }
} }
mLineageDAG.add(lineage, parentLineages); mLineageDAG.add(lineage, parentLineages);


// update index // update index
for (TachyonFile outputFile : lineage.getOutputFiles()) { for (long outputFile : lineage.getOutputFiles()) {
mOutputFileIndex.put(outputFile.getFileId(), lineage); mOutputFileIndex.put(outputFile, lineage);
} }
mIdIndex.put(lineage.getId(), lineage); mIdIndex.put(lineage.getId(), lineage);
} }


/**
* Completes an output file.
*
* @param fileId the file id
*/
public synchronized void completeFile(long fileId) {
Preconditions.checkState(mOutputFileIndex.containsKey(fileId));
Lineage lineage = mOutputFileIndex.get(fileId);
lineage.updateOutputFileState(fileId, LineageFileState.COMPLETED);
}

/** /**
* Deletes a lineage. * Deletes a lineage.
* *
Expand Down Expand Up @@ -142,24 +129,11 @@ private void deleteLineage(long lineageId, Set<Long> deleted)
mLineageDAG.deleteLeaf(toDelete); mLineageDAG.deleteLeaf(toDelete);
mIdIndex.remove(lineageId); mIdIndex.remove(lineageId);
deleted.add(lineageId); deleted.add(lineageId);
for (TachyonFile outputFile : toDelete.getOutputFiles()) { for (long outputFile : toDelete.getOutputFiles()) {
mOutputFileIndex.remove(outputFile.getFileId()); mOutputFileIndex.remove(outputFile);
} }
} }


/**
* Requests an output file as being persisted.
*
* @param fileId the file id
* @throws LineageDoesNotExistException if the lineage does not exist
*/
public synchronized void requestFilePersistence(long fileId) throws LineageDoesNotExistException {
LineageDoesNotExistException.check(mOutputFileIndex.containsKey(fileId),
ExceptionMessage.LINEAGE_OUTPUT_FILE_NOT_EXIST, fileId);
Lineage lineage = mOutputFileIndex.get(fileId);
lineage.updateOutputFileState(fileId, LineageFileState.PERSISENCE_REQUESTED);
}

/** /**
* Gets the lineage. * Gets the lineage.
* *
Expand Down Expand Up @@ -200,45 +174,13 @@ public synchronized List<Lineage> getParents(Lineage lineage)
return mLineageDAG.getParents(lineage); return mLineageDAG.getParents(lineage);
} }


/**
* Reports an output file as lost.
*
* @param fileId the file id
* @return the lineage containing the output file, null if no lineage outputs the given file
* @throws LineageDoesNotExistException
*/
public synchronized Lineage reportLostFile(long fileId) throws LineageDoesNotExistException {
Lineage lineage = mOutputFileIndex.get(fileId);
LineageDoesNotExistException.check(lineage != null, ExceptionMessage.LINEAGE_DOES_NOT_EXIST,
fileId);
// TODO(yupeng) push the persisted info to FS master
if (lineage.getOutputFileState(fileId) != LineageFileState.PERSISTED) {
lineage.updateOutputFileState(fileId, LineageFileState.LOST);
}
return lineage;
}

/** /**
* @return the list of all root lineages * @return the list of all root lineages
*/ */
public synchronized List<Lineage> getRootLineages() { public synchronized List<Lineage> getRootLineages() {
return mLineageDAG.getRoots(); return mLineageDAG.getRoots();
} }


/**
* Commits a file as persisted
*
* @param fileId the file id
* @throws LineageDoesNotExistException if the lineage does not exist
*/
public synchronized void commitFilePersistence(Long fileId) throws LineageDoesNotExistException {
LineageDoesNotExistException.check(mOutputFileIndex.containsKey(fileId),
ExceptionMessage.LINEAGE_OUTPUT_FILE_NOT_EXIST, fileId);

Lineage lineage = mOutputFileIndex.get(fileId);
lineage.updateOutputFileState(fileId, LineageFileState.PERSISTED);
}

/** /**
* Sorts a given set of lineages topologically. * Sorts a given set of lineages topologically.
* *
Expand Down Expand Up @@ -275,17 +217,4 @@ public synchronized void streamToJournalCheckpoint(JournalOutputStream outputStr
public boolean hasOutputFile(long fileId) { public boolean hasOutputFile(long fileId) {
return mOutputFileIndex.containsKey(fileId); return mOutputFileIndex.containsKey(fileId);
} }

/**
* @param fileId the fild id
* @return the lineage state of the given file
* @throws LineageDoesNotExistException if the file does not belong to any lineage
*/
public synchronized LineageFileState getLineageFileState(long fileId)
throws LineageDoesNotExistException {
if (!mOutputFileIndex.containsKey(fileId)) {
throw new LineageDoesNotExistException("No lineage has output file " + fileId);
}
return mOutputFileIndex.get(fileId).getOutputFileState(fileId);
}
} }

0 comments on commit 248aac6

Please sign in to comment.