Skip to content

Commit

Permalink
Introduced lineage file
Browse files Browse the repository at this point in the history
  • Loading branch information
yupeng9 committed Sep 18, 2015
1 parent cb61373 commit aa0d537
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 20 deletions.
Expand Up @@ -18,7 +18,7 @@
/** /**
* A file handler for a file in Tachyon. It is a wrapper around the file ID for now. * A file handler for a file in Tachyon. It is a wrapper around the file ID for now.
*/ */
public final class TachyonFile { public class TachyonFile {
private final long mFileId; private final long mFileId;


/** /**
Expand Down
11 changes: 11 additions & 0 deletions servers/src/main/java/tachyon/master/file/FileSystemMaster.java
Expand Up @@ -473,6 +473,17 @@ InodeTree.CreatePathResult createFileInternal(TachyonURI path, long blockSizeByt
return createResult; return createResult;
} }


/**
* Resets the block size of an existing open file.
* @param path the path to the file
* @param blockSizeBytes the new block size
* @return the file id
* @throws InvalidPathException
*/
public long resetBlockSize(TachyonURI path, long blockSizeBytes) throws InvalidPathException {
// TODO(yupeng): add validation
return mInodeTree.resetBlockSize(path, blockSizeBytes);
}
/** /**
* Returns the next block id for a given file id. Called via RPC. * Returns the next block id for a given file id. Called via RPC.
* *
Expand Down
10 changes: 9 additions & 1 deletion servers/src/main/java/tachyon/master/file/meta/InodeFile.java
Expand Up @@ -32,7 +32,7 @@
*/ */
public final class InodeFile extends Inode { public final class InodeFile extends Inode {
private final long mBlockContainerId; private final long mBlockContainerId;
private final long mBlockSizeBytes; private long mBlockSizeBytes;


// list of block ids. // list of block ids.
private List<Long> mBlocks; private List<Long> mBlocks;
Expand Down Expand Up @@ -84,6 +84,14 @@ public FileInfo generateClientFileInfo(String path) {
return ret; return ret;
} }


/**
* Resets the block size
*/
public void resetBlockSize(long blockSizeBytes) {
// TODO(yupeng): add validation
mBlockSizeBytes = blockSizeBytes;
}

/** /**
* @return a duplication of all the block ids of the file * @return a duplication of all the block ids of the file
*/ */
Expand Down
14 changes: 14 additions & 0 deletions servers/src/main/java/tachyon/master/file/meta/InodeTree.java
Expand Up @@ -286,6 +286,20 @@ public CreatePathResult createPath(TachyonURI path, long blockSizeBytes, boolean
return new CreatePathResult(modifiedInodes, createdInodes); return new CreatePathResult(modifiedInodes, createdInodes);
} }


/**
* Resets the block size of an existing open file.
* @param path the path to the file
* @param blockSizeBytes the new block size
* @return the file id
* @throws InvalidPathException
*/
public long resetBlockSize(TachyonURI path, long blockSizeBytes) throws InvalidPathException {
// TODO(yupeng): add validation
InodeFile file = (InodeFile)getInodeByPath(path);
file.resetBlockSize(blockSizeBytes);
return file.getId();
}

/** /**
* Returns a list of all descendants of a particular {@link InodeDirectory}. Any directory inode * Returns a list of all descendants of a particular {@link InodeDirectory}. Any directory inode
* precedes its descendants in the list. * precedes its descendants in the list.
Expand Down
12 changes: 9 additions & 3 deletions servers/src/main/java/tachyon/master/lineage/LineageMaster.java
Expand Up @@ -38,6 +38,7 @@
import tachyon.master.journal.JournalOutputStream; import tachyon.master.journal.JournalOutputStream;
import tachyon.master.lineage.checkpoint.CheckpointPlanningExecutor; import tachyon.master.lineage.checkpoint.CheckpointPlanningExecutor;
import tachyon.master.lineage.meta.Lineage; import tachyon.master.lineage.meta.Lineage;
import tachyon.master.lineage.meta.LineageFile;
import tachyon.master.lineage.meta.LineageStore; import tachyon.master.lineage.meta.LineageStore;
import tachyon.master.lineage.recompute.RecomputeExecutor; import tachyon.master.lineage.recompute.RecomputeExecutor;
import tachyon.master.lineage.recompute.RecomputeLauncher; import tachyon.master.lineage.recompute.RecomputeLauncher;
Expand Down Expand Up @@ -134,12 +135,12 @@ public long createLineage(List<TachyonURI> inputFiles, List<TachyonURI> outputFi
} }
} }
// create output files // create output files
List<TachyonFile> outputTachyonFiles = Lists.newArrayList(); List<LineageFile> outputTachyonFiles = Lists.newArrayList();
for (TachyonURI outputFile : outputFiles) { for (TachyonURI outputFile : outputFiles) {
long fileId; long fileId;
try { try {
fileId = mFileSystemMaster.createFile(outputFile, 0, true); fileId = mFileSystemMaster.createFile(outputFile, 0, true);
outputTachyonFiles.add(new TachyonFile(fileId)); outputTachyonFiles.add(new LineageFile(fileId));
} catch (InvalidPathException e) { } catch (InvalidPathException e) {
// TODO error handling // TODO error handling
} catch (FileAlreadyExistException e) { } catch (FileAlreadyExistException e) {
Expand Down Expand Up @@ -168,7 +169,12 @@ public boolean deleteLineage(long lineageId, boolean cascade) {
} }


public long recreateFile(String path, long blockSizeBytes) { public long recreateFile(String path, long blockSizeBytes) {
return -1; try {
return mFileSystemMaster.resetBlockSize(new TachyonURI(path), blockSizeBytes);
} catch (InvalidPathException e) {
// TODO(yupeng): error handling
return -1;
}
} }


public void asyncCompleteFile(long fileId, String filePath) { public void asyncCompleteFile(long fileId, String filePath) {
Expand Down
17 changes: 3 additions & 14 deletions servers/src/main/java/tachyon/master/lineage/meta/Lineage.java
Expand Up @@ -17,10 +17,8 @@


import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;


import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;


import tachyon.client.file.TachyonFile; import tachyon.client.file.TachyonFile;
import tachyon.job.Job; import tachyon.job.Job;
Expand All @@ -30,14 +28,9 @@
* and the output files the job generates. * and the output files the job generates.
*/ */
public final class Lineage { public final class Lineage {
enum FileState {
CREATED, ADDED, CHECKPOINTED, LOST
}

private final long mId; private final long mId;
private final List<TachyonFile> mInputFiles; private final List<TachyonFile> mInputFiles;
private final List<TachyonFile> mOutputFiles; private final List<LineageFile> mOutputFiles;
private final Map<TachyonFile, FileState> mOutputFilesState;
private final Job mJob; private final Job mJob;


private LineageState mState; private LineageState mState;
Expand All @@ -49,13 +42,9 @@ enum FileState {
* @param outputFiles the output files. * @param outputFiles the output files.
* @param job the job * @param job the job
*/ */
public Lineage(List<TachyonFile> inputFiles, List<TachyonFile> outputFiles, Job job) { public Lineage(List<TachyonFile> inputFiles, List<LineageFile> outputFiles, Job job) {
mInputFiles = Preconditions.checkNotNull(inputFiles); mInputFiles = Preconditions.checkNotNull(inputFiles);
mOutputFiles = Preconditions.checkNotNull(outputFiles); mOutputFiles = Preconditions.checkNotNull(outputFiles);
mOutputFilesState = Maps.newHashMap();
for (TachyonFile tachyonFile : outputFiles) {
mOutputFilesState.put(tachyonFile, FileState.CREATED);
}
mJob = Preconditions.checkNotNull(job); mJob = Preconditions.checkNotNull(job);
mState = LineageState.ADDED; mState = LineageState.ADDED;
mId = LineageIdGenerator.generateId(); mId = LineageIdGenerator.generateId();
Expand All @@ -65,7 +54,7 @@ public List<TachyonFile> getInputFiles() {
return Collections.unmodifiableList(mInputFiles); return Collections.unmodifiableList(mInputFiles);
} }


public List<TachyonFile> getOutputFiles() { public List<LineageFile> getOutputFiles() {
return Collections.unmodifiableList(mOutputFiles); return Collections.unmodifiableList(mOutputFiles);
} }


Expand Down
44 changes: 44 additions & 0 deletions servers/src/main/java/tachyon/master/lineage/meta/LineageFile.java
@@ -0,0 +1,44 @@
/*
* Licensed to the University of California, Berkeley under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package tachyon.master.lineage.meta;

import tachyon.client.file.TachyonFile;

public final class LineageFile extends TachyonFile {
private LineageFileState mState;
private String mUnderFilePath;

public LineageFile(long fileId) {
super(fileId);
mState = LineageFileState.ADDED;
}

public String getUnderFilePath() {
return mUnderFilePath;
}

public void setUnderFilePath(String path) {
mUnderFilePath = path;
}

public LineageFileState getState() {
return mState;
}

public void setState(LineageFileState state) {
mState = state;
}
}
@@ -0,0 +1,23 @@
/*
* Licensed to the University of California, Berkeley under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package tachyon.master.lineage.meta;

/**
* The state of a lineage file.
*/
public enum LineageFileState {
ADDED, RECORDED, CHECKPOINTED, LOST
}
Expand Up @@ -46,7 +46,7 @@ public LineageStore() {
mIdIndex = Maps.newHashMap(); mIdIndex = Maps.newHashMap();
} }


public long addLineage(List<TachyonFile> inputFiles, List<TachyonFile> outputFiles, Job job) { public long addLineage(List<TachyonFile> inputFiles, List<LineageFile> outputFiles, Job job) {
Lineage lineage = new Lineage(inputFiles, outputFiles, job); Lineage lineage = new Lineage(inputFiles, outputFiles, job);


List<Lineage> parentLineages = Lists.newArrayList(); List<Lineage> parentLineages = Lists.newArrayList();
Expand Down

0 comments on commit aa0d537

Please sign in to comment.