Skip to content

Commit

Permalink
Refactored the lineage entry
Browse files Browse the repository at this point in the history
  • Loading branch information
yupeng9 committed Sep 24, 2015
1 parent 39094da commit 207af36
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 126 deletions.
Expand Up @@ -44,7 +44,5 @@ public enum JournalEntryType {
// Lineage master entries // Lineage master entries
ASYNC_COMPLETE_FILE, ASYNC_COMPLETE_FILE,
LINEAGE, LINEAGE,
LINEAGE_FILE,
LINEAGE_ID_GENERATOR, LINEAGE_ID_GENERATOR,
JOB,
} }
Expand Up @@ -40,7 +40,10 @@
import com.google.common.collect.Maps; import com.google.common.collect.Maps;


import tachyon.TachyonURI; import tachyon.TachyonURI;
import tachyon.exception.ExceptionMessage; import tachyon.client.file.TachyonFile;
import tachyon.job.CommandLineJob;
import tachyon.job.Job;
import tachyon.job.JobConf;
import tachyon.master.block.journal.BlockContainerIdGeneratorEntry; import tachyon.master.block.journal.BlockContainerIdGeneratorEntry;
import tachyon.master.block.journal.BlockInfoEntry; import tachyon.master.block.journal.BlockInfoEntry;
import tachyon.master.file.journal.AddCheckpointEntry; import tachyon.master.file.journal.AddCheckpointEntry;
Expand All @@ -52,8 +55,14 @@
import tachyon.master.file.journal.InodeFileEntry; import tachyon.master.file.journal.InodeFileEntry;
import tachyon.master.file.journal.InodeLastModificationTimeEntry; import tachyon.master.file.journal.InodeLastModificationTimeEntry;
import tachyon.master.file.journal.RenameEntry; import tachyon.master.file.journal.RenameEntry;
import tachyon.master.file.journal.ResizeBlockEntry;
import tachyon.master.file.journal.SetPinnedEntry; import tachyon.master.file.journal.SetPinnedEntry;
import tachyon.master.file.meta.DependencyType; import tachyon.master.file.meta.DependencyType;
import tachyon.master.lineage.journal.AsyncCompleteFileEntry;
import tachyon.master.lineage.journal.LineageEntry;
import tachyon.master.lineage.journal.LineageIdGeneratorEntry;
import tachyon.master.lineage.meta.LineageFile;
import tachyon.master.lineage.meta.LineageFileState;
import tachyon.master.rawtable.journal.RawTableEntry; import tachyon.master.rawtable.journal.RawTableEntry;
import tachyon.master.rawtable.journal.UpdateMetadataEntry; import tachyon.master.rawtable.journal.UpdateMetadataEntry;


Expand Down Expand Up @@ -342,6 +351,11 @@ public JournalEntry getNextEntry() throws IOException {
entry.getLong("containerId"), entry.getLong("containerId"),
entry.getLong("sequenceNumber")); entry.getLong("sequenceNumber"));
} }
case RESIZE_BLOCK: {
return new ResizeBlockEntry(
entry.getString("path"),
entry.getLong("blockSizeBytes"));
}


// RawTable // RawTable
case RAW_TABLE: { case RAW_TABLE: {
Expand All @@ -355,6 +369,42 @@ public JournalEntry getNextEntry() throws IOException {
entry.getLong("id"), entry.getLong("id"),
entry.getByteBuffer("metadata")); entry.getByteBuffer("metadata"));
} }

// Lineage
case ASYNC_COMPLETE_FILE: {
return new AsyncCompleteFileEntry(
entry.getLong("fileId"),
entry.getString("underFsPath"));
}
case LINEAGE: {
List<TachyonFile> inputFiles = Lists.newArrayList();
for (long fileId : entry.get("inputFiles", new TypeReference<List<Long>>() {})) {
inputFiles.add(new TachyonFile(fileId));
}
List<LineageFile> outputFiles = Lists.newArrayList();
List<Long> outputFileIds =
entry.get("outputFileIds", new TypeReference<List<Long>>() {});
List<LineageFileState> outputFileStates =
entry.get("outputFileStates", new TypeReference<List<LineageFileState>>() {});
List<String> outputFileUnderFsPaths =
entry.get("outputFileUnderFsPaths", new TypeReference<List<String>>() {});
for (int i = 0; i < outputFileIds.size(); i ++) {
outputFiles.add(new LineageFile(outputFileIds.get(i), outputFileStates.get(i),
outputFileUnderFsPaths.get(i)));
}
Job job = new CommandLineJob(entry.getString("jobCommand"),
new JobConf(entry.getString("jobOutputPath")));
return new LineageEntry(
entry.getLong("id"),
inputFiles,
outputFiles,
job,
entry.getLong("creationTimeMs"));
}
case LINEAGE_ID_GENERATOR: {
return new LineageIdGeneratorEntry(
entry.getLong("sequenceNumber"));
}
default: default:
throw new IOException("Unknown journal entry type: " + entry.mType); throw new IOException("Unknown journal entry type: " + entry.mType);
} }
Expand Down
Expand Up @@ -17,7 +17,6 @@


import java.util.Map; import java.util.Map;


import com.google.common.base.Preconditions;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;


import tachyon.master.journal.JournalEntry; import tachyon.master.journal.JournalEntry;
Expand All @@ -29,7 +28,7 @@ public class AsyncCompleteFileEntry implements JournalEntry {


public AsyncCompleteFileEntry(long fileId, String underFsPath) { public AsyncCompleteFileEntry(long fileId, String underFsPath) {
mFileId = fileId; mFileId = fileId;
mUnderFsPath = Preconditions.checkNotNull(underFsPath); mUnderFsPath = underFsPath;
} }


public long getFileId() { public long getFileId() {
Expand Down
54 changes: 0 additions & 54 deletions servers/src/main/java/tachyon/master/lineage/journal/JobEntry.java

This file was deleted.

Expand Up @@ -25,16 +25,22 @@
import tachyon.client.file.TachyonFile; import tachyon.client.file.TachyonFile;
import tachyon.job.CommandLineJob; import tachyon.job.CommandLineJob;
import tachyon.job.Job; import tachyon.job.Job;
import tachyon.job.JobConf;
import tachyon.master.journal.JournalEntry; import tachyon.master.journal.JournalEntry;
import tachyon.master.journal.JournalEntryType; import tachyon.master.journal.JournalEntryType;
import tachyon.master.lineage.meta.Lineage; import tachyon.master.lineage.meta.Lineage;
import tachyon.master.lineage.meta.LineageFile; import tachyon.master.lineage.meta.LineageFile;
import tachyon.master.lineage.meta.LineageFileState;


public class LineageEntry implements JournalEntry { public class LineageEntry implements JournalEntry {
private final long mId; private final long mId;
private final List<Long> mInputFiles; private final List<Long> mInputFiles;
private final List<LineageFileEntry> mOutputFiles; private final List<Long> mOutputFileIds;
private final JobEntry mJob; // TODO(yupeng) allow journal entry to have nested class
private final List<LineageFileState> mOutputFileStates;
private final List<String> mOutputFileUnderFsPaths;
private final String mJobCommand;
private final String mJobOutputPath;
private final long mCreationTimeMs; private final long mCreationTimeMs;


public LineageEntry(long id, List<TachyonFile> inputFiles, List<LineageFile> outputFiles, Job job, public LineageEntry(long id, List<TachyonFile> inputFiles, List<LineageFile> outputFiles, Job job,
Expand All @@ -44,16 +50,19 @@ public LineageEntry(long id, List<TachyonFile> inputFiles, List<LineageFile> out
for (TachyonFile file : inputFiles) { for (TachyonFile file : inputFiles) {
mInputFiles.add(file.getFileId()); mInputFiles.add(file.getFileId());
} }
mOutputFiles = Lists.newArrayList(); mOutputFileIds = Lists.newArrayList();
mOutputFileStates = Lists.newArrayList();
mOutputFileUnderFsPaths = Lists.newArrayList();
for (LineageFile file : outputFiles) { for (LineageFile file : outputFiles) {
mOutputFiles mOutputFileIds.add(file.getFileId());
.add(new LineageFileEntry(file.getFileId(), file.getState(), file.getUnderFilePath())); mOutputFileStates.add(file.getState());
mOutputFileUnderFsPaths.add(file.getUnderFilePath());
} }
// TODO(yupeng) support other job types // TODO(yupeng) support other job types
Preconditions.checkState(job instanceof CommandLineJob); Preconditions.checkState(job instanceof CommandLineJob);
CommandLineJob commandLineJob = (CommandLineJob) job; CommandLineJob commandLineJob = (CommandLineJob) job;
mJob = mJobCommand = commandLineJob.getCommand();
new JobEntry(commandLineJob.getJobConf().getOutputFilePath(), commandLineJob.getCommand()); mJobOutputPath = commandLineJob.getJobConf().getOutputFilePath();
mCreationTimeMs = creationTimeMs; mCreationTimeMs = creationTimeMs;
} }


Expand All @@ -64,11 +73,12 @@ public Lineage toLineage() {
} }


List<LineageFile> outputFiles = Lists.newArrayList(); List<LineageFile> outputFiles = Lists.newArrayList();
for (LineageFileEntry lineageFileEntry : mOutputFiles) { for (int i = 0; i < mOutputFileIds.size(); i ++) {
outputFiles.add(lineageFileEntry.toLineageFile()); outputFiles.add(new LineageFile(mOutputFileIds.get(i), mOutputFileStates.get(i),
mOutputFileUnderFsPaths.get(i)));
} }


Job job = mJob.toJob(); Job job = new CommandLineJob(mJobCommand, new JobConf(mJobOutputPath));


return new Lineage(mId, inputFiles, outputFiles, job, mCreationTimeMs); return new Lineage(mId, inputFiles, outputFiles, job, mCreationTimeMs);
} }
Expand All @@ -83,8 +93,11 @@ public Map<String, Object> getParameters() {
Map<String, Object> parameters = Maps.newHashMapWithExpectedSize(5); Map<String, Object> parameters = Maps.newHashMapWithExpectedSize(5);
parameters.put("id", mId); parameters.put("id", mId);
parameters.put("inputFiles", mInputFiles); parameters.put("inputFiles", mInputFiles);
parameters.put("outputFiles", mOutputFiles); parameters.put("outputFileIds", mOutputFileIds);
parameters.put("job", mJob); parameters.put("outputFileStates", mOutputFileStates);
parameters.put("outputFileUnderFsPaths", mOutputFileStates);
parameters.put("jobCommand", mJobCommand);
parameters.put("jobOutputPath", mJobOutputPath);
parameters.put("creationTimeMs", mCreationTimeMs); parameters.put("creationTimeMs", mCreationTimeMs);
return parameters; return parameters;
} }
Expand Down

This file was deleted.

0 comments on commit 207af36

Please sign in to comment.