From 5daf302e273827e3b1af0ad45098bd3acc95698f Mon Sep 17 00:00:00 2001 From: Yupeng Fu Date: Sat, 12 Dec 2015 13:24:37 -0800 Subject: [PATCH] Moved lineage worker to file worker --- .../main/java/tachyon/thrift/BlockInfo.java | 2 +- .../java/tachyon/thrift/BlockLocation.java | 2 +- .../thrift/BlockMasterClientService.java | 2 +- .../thrift/BlockMasterWorkerService.java | 2 +- .../src/main/java/tachyon/thrift/Command.java | 2 +- .../tachyon/thrift/CommandLineJobInfo.java | 2 +- .../tachyon/thrift/CompleteFileTOptions.java | 2 +- .../java/tachyon/thrift/CreateTOptions.java | 2 +- .../java/tachyon/thrift/DependencyInfo.java | 2 +- .../java/tachyon/thrift/FileBlockInfo.java | 2 +- .../main/java/tachyon/thrift/FileInfo.java | 2 +- .../thrift/FileSystemMasterClientService.java | 2 +- .../thrift/FileSystemMasterWorkerService.java | 40 ++++---- .../main/java/tachyon/thrift/JobConfInfo.java | 2 +- .../java/tachyon/thrift/LineageFileInfo.java | 2 +- .../main/java/tachyon/thrift/LineageInfo.java | 2 +- .../thrift/LineageMasterClientService.java | 2 +- .../java/tachyon/thrift/MkdirTOptions.java | 2 +- .../main/java/tachyon/thrift/NetAddress.java | 2 +- .../java/tachyon/thrift/PersistCommand.java | 2 +- .../main/java/tachyon/thrift/PersistFile.java | 2 +- .../java/tachyon/thrift/RawTableInfo.java | 2 +- .../thrift/RawTableMasterClientService.java | 2 +- .../main/java/tachyon/thrift/RpcOptions.java | 2 +- .../java/tachyon/thrift/SetStateTOptions.java | 2 +- .../java/tachyon/thrift/TachyonService.java | 2 +- .../tachyon/thrift/TachyonTException.java | 2 +- .../tachyon/thrift/ThriftIOException.java | 2 +- .../main/java/tachyon/thrift/WorkerInfo.java | 2 +- .../java/tachyon/thrift/WorkerService.java | 2 +- common/src/thrift/file_system_master.thrift | 4 +- .../master/AbstractLocalTachyonCluster.java | 6 +- .../tachyon/master/LocalTachyonCluster.java | 4 +- .../java/tachyon/worker/TachyonWorker.java | 20 ++-- .../FileDataManager.java} | 8 +- .../worker/file/FileSystemMasterClient.java | 21 +++++ .../FileWorker.java} | 32 +++---- .../FileWorkerMasterSyncExecutor.java} | 48 +++++----- .../worker/lineage/LineageMasterClient.java | 91 ------------------- .../lineage/LineageDataManagerTest.java | 7 +- 40 files changed, 132 insertions(+), 207 deletions(-) rename servers/src/main/java/tachyon/worker/{lineage/LineageDataManager.java => file/FileDataManager.java} (96%) rename servers/src/main/java/tachyon/worker/{lineage/LineageWorker.java => file/FileWorker.java} (70%) rename servers/src/main/java/tachyon/worker/{lineage/LineageWorkerMasterSyncExecutor.java => file/FileWorkerMasterSyncExecutor.java} (68%) delete mode 100644 servers/src/main/java/tachyon/worker/lineage/LineageMasterClient.java diff --git a/common/src/main/java/tachyon/thrift/BlockInfo.java b/common/src/main/java/tachyon/thrift/BlockInfo.java index 7f3b96d2fb1a..7672c2c95529 100644 --- a/common/src/main/java/tachyon/thrift/BlockInfo.java +++ b/common/src/main/java/tachyon/thrift/BlockInfo.java @@ -38,7 +38,7 @@ * Contains the information of a block in Tachyon. It maintains the worker nodes where the replicas * of the blocks are stored. */ -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class BlockInfo implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("BlockInfo"); diff --git a/common/src/main/java/tachyon/thrift/BlockLocation.java b/common/src/main/java/tachyon/thrift/BlockLocation.java index 079a19b46902..8780a19bb182 100644 --- a/common/src/main/java/tachyon/thrift/BlockLocation.java +++ b/common/src/main/java/tachyon/thrift/BlockLocation.java @@ -37,7 +37,7 @@ /** * Information about blocks. */ -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class BlockLocation implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("BlockLocation"); diff --git a/common/src/main/java/tachyon/thrift/BlockMasterClientService.java b/common/src/main/java/tachyon/thrift/BlockMasterClientService.java index c207bc1173b5..e8e9a4369090 100644 --- a/common/src/main/java/tachyon/thrift/BlockMasterClientService.java +++ b/common/src/main/java/tachyon/thrift/BlockMasterClientService.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class BlockMasterClientService { /** diff --git a/common/src/main/java/tachyon/thrift/BlockMasterWorkerService.java b/common/src/main/java/tachyon/thrift/BlockMasterWorkerService.java index 8f0940a84059..87bacce2b3ba 100644 --- a/common/src/main/java/tachyon/thrift/BlockMasterWorkerService.java +++ b/common/src/main/java/tachyon/thrift/BlockMasterWorkerService.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class BlockMasterWorkerService { /** diff --git a/common/src/main/java/tachyon/thrift/Command.java b/common/src/main/java/tachyon/thrift/Command.java index da912894d754..50a80b798870 100644 --- a/common/src/main/java/tachyon/thrift/Command.java +++ b/common/src/main/java/tachyon/thrift/Command.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class Command implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("Command"); diff --git a/common/src/main/java/tachyon/thrift/CommandLineJobInfo.java b/common/src/main/java/tachyon/thrift/CommandLineJobInfo.java index e247f2d6e013..a2b56a41ddde 100644 --- a/common/src/main/java/tachyon/thrift/CommandLineJobInfo.java +++ b/common/src/main/java/tachyon/thrift/CommandLineJobInfo.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class CommandLineJobInfo implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("CommandLineJobInfo"); diff --git a/common/src/main/java/tachyon/thrift/CompleteFileTOptions.java b/common/src/main/java/tachyon/thrift/CompleteFileTOptions.java index e6551c3be158..1afc1d57423d 100644 --- a/common/src/main/java/tachyon/thrift/CompleteFileTOptions.java +++ b/common/src/main/java/tachyon/thrift/CompleteFileTOptions.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class CompleteFileTOptions implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("CompleteFileTOptions"); diff --git a/common/src/main/java/tachyon/thrift/CreateTOptions.java b/common/src/main/java/tachyon/thrift/CreateTOptions.java index 5ae71552ea6d..babc396c00d4 100644 --- a/common/src/main/java/tachyon/thrift/CreateTOptions.java +++ b/common/src/main/java/tachyon/thrift/CreateTOptions.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class CreateTOptions implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("CreateTOptions"); diff --git a/common/src/main/java/tachyon/thrift/DependencyInfo.java b/common/src/main/java/tachyon/thrift/DependencyInfo.java index e9110b91d206..3a1ce721a5a1 100644 --- a/common/src/main/java/tachyon/thrift/DependencyInfo.java +++ b/common/src/main/java/tachyon/thrift/DependencyInfo.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class DependencyInfo implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("DependencyInfo"); diff --git a/common/src/main/java/tachyon/thrift/FileBlockInfo.java b/common/src/main/java/tachyon/thrift/FileBlockInfo.java index 9f6ca9ed8603..58604d3caa40 100644 --- a/common/src/main/java/tachyon/thrift/FileBlockInfo.java +++ b/common/src/main/java/tachyon/thrift/FileBlockInfo.java @@ -38,7 +38,7 @@ * Contains the information of a block in a file. In addition to the BlockInfo, it includes the * offset in the file, and the under file system locations of the block replicas. */ -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class FileBlockInfo implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("FileBlockInfo"); diff --git a/common/src/main/java/tachyon/thrift/FileInfo.java b/common/src/main/java/tachyon/thrift/FileInfo.java index c597c8740587..09695d421eb3 100644 --- a/common/src/main/java/tachyon/thrift/FileInfo.java +++ b/common/src/main/java/tachyon/thrift/FileInfo.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class FileInfo implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("FileInfo"); diff --git a/common/src/main/java/tachyon/thrift/FileSystemMasterClientService.java b/common/src/main/java/tachyon/thrift/FileSystemMasterClientService.java index 8888fba18de4..5819996b7e12 100644 --- a/common/src/main/java/tachyon/thrift/FileSystemMasterClientService.java +++ b/common/src/main/java/tachyon/thrift/FileSystemMasterClientService.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class FileSystemMasterClientService { /** diff --git a/common/src/main/java/tachyon/thrift/FileSystemMasterWorkerService.java b/common/src/main/java/tachyon/thrift/FileSystemMasterWorkerService.java index f4cd97748b29..08f68671b8c9 100644 --- a/common/src/main/java/tachyon/thrift/FileSystemMasterWorkerService.java +++ b/common/src/main/java/tachyon/thrift/FileSystemMasterWorkerService.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class FileSystemMasterWorkerService { /** @@ -50,14 +50,14 @@ public interface Iface extends tachyon.thrift.TachyonService.Iface { public Set getPinIdList() throws org.apache.thrift.TException; /** - * Periodic lineage worker heartbeat. Returns the command for checkpointing + * Periodic lineage worker heartbeat. Returns the command for persisting * the blocks of a file. * * @param workerId the id of the worker * * @param persistedFiles the list of persisted files */ - public tachyon.thrift.Command heartbeat(long workerId, List persistedFiles) throws tachyon.thrift.TachyonTException, org.apache.thrift.TException; + public PersistCommand heartbeat(long workerId, List persistedFiles) throws tachyon.thrift.TachyonTException, org.apache.thrift.TException; } @@ -139,7 +139,7 @@ public Set recv_getPinIdList() throws org.apache.thrift.TException throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "getPinIdList failed: unknown result"); } - public tachyon.thrift.Command heartbeat(long workerId, List persistedFiles) throws tachyon.thrift.TachyonTException, org.apache.thrift.TException + public PersistCommand heartbeat(long workerId, List persistedFiles) throws tachyon.thrift.TachyonTException, org.apache.thrift.TException { send_heartbeat(workerId, persistedFiles); return recv_heartbeat(); @@ -153,7 +153,7 @@ public void send_heartbeat(long workerId, List persistedFiles) throws org. sendBase("heartbeat", args); } - public tachyon.thrift.Command recv_heartbeat() throws tachyon.thrift.TachyonTException, org.apache.thrift.TException + public PersistCommand recv_heartbeat() throws tachyon.thrift.TachyonTException, org.apache.thrift.TException { heartbeat_result result = new heartbeat_result(); receiveBase(result, "heartbeat"); @@ -270,7 +270,7 @@ public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apa prot.writeMessageEnd(); } - public tachyon.thrift.Command getResult() throws tachyon.thrift.TachyonTException, org.apache.thrift.TException { + public PersistCommand getResult() throws tachyon.thrift.TachyonTException, org.apache.thrift.TException { if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) { throw new IllegalStateException("Method call not finished!"); } @@ -494,7 +494,7 @@ public void start(I iface, getPinIdList_args args, org.apache.thrift.async.Async } } - public static class heartbeat extends org.apache.thrift.AsyncProcessFunction { + public static class heartbeat extends org.apache.thrift.AsyncProcessFunction { public heartbeat() { super("heartbeat"); } @@ -503,10 +503,10 @@ public heartbeat_args getEmptyArgsInstance() { return new heartbeat_args(); } - public AsyncMethodCallback getResultHandler(final AsyncFrameBuffer fb, final int seqid) { + public AsyncMethodCallback getResultHandler(final AsyncFrameBuffer fb, final int seqid) { final org.apache.thrift.AsyncProcessFunction fcall = this; - return new AsyncMethodCallback() { - public void onComplete(tachyon.thrift.Command o) { + return new AsyncMethodCallback() { + public void onComplete(PersistCommand o) { heartbeat_result result = new heartbeat_result(); result.success = o; try { @@ -546,7 +546,7 @@ protected boolean isOneway() { return false; } - public void start(I iface, heartbeat_args args, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws TException { + public void start(I iface, heartbeat_args args, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws TException { iface.heartbeat(args.workerId, args.persistedFiles,resultHandler); } } @@ -2604,7 +2604,7 @@ public static class heartbeat_result implements org.apache.thrift.TBase tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, tachyon.thrift.Command.class))); + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, PersistCommand.class))); tmpMap.put(_Fields.E, new org.apache.thrift.meta_data.FieldMetaData("e", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT))); metaDataMap = Collections.unmodifiableMap(tmpMap); @@ -2684,7 +2684,7 @@ public heartbeat_result() { } public heartbeat_result( - tachyon.thrift.Command success, + PersistCommand success, tachyon.thrift.TachyonTException e) { this(); @@ -2697,7 +2697,7 @@ public heartbeat_result( */ public heartbeat_result(heartbeat_result other) { if (other.isSetSuccess()) { - this.success = new tachyon.thrift.Command(other.success); + this.success = new PersistCommand(other.success); } if (other.isSetE()) { this.e = new tachyon.thrift.TachyonTException(other.e); @@ -2714,11 +2714,11 @@ public void clear() { this.e = null; } - public tachyon.thrift.Command getSuccess() { + public PersistCommand getSuccess() { return this.success; } - public heartbeat_result setSuccess(tachyon.thrift.Command success) { + public heartbeat_result setSuccess(PersistCommand success) { this.success = success; return this; } @@ -2768,7 +2768,7 @@ public void setFieldValue(_Fields field, Object value) { if (value == null) { unsetSuccess(); } else { - setSuccess((tachyon.thrift.Command)value); + setSuccess((PersistCommand)value); } break; @@ -2972,7 +2972,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, heartbeat_result st switch (schemeField.id) { case 0: // SUCCESS if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { - struct.success = new tachyon.thrift.Command(); + struct.success = new PersistCommand(); struct.success.read(iprot); struct.setSuccessIsSet(true); } else { @@ -3051,7 +3051,7 @@ public void read(org.apache.thrift.protocol.TProtocol prot, heartbeat_result str TTupleProtocol iprot = (TTupleProtocol) prot; BitSet incoming = iprot.readBitSet(2); if (incoming.get(0)) { - struct.success = new tachyon.thrift.Command(); + struct.success = new PersistCommand(); struct.success.read(iprot); struct.setSuccessIsSet(true); } diff --git a/common/src/main/java/tachyon/thrift/JobConfInfo.java b/common/src/main/java/tachyon/thrift/JobConfInfo.java index dec8cd972692..b1434a9ff5b1 100644 --- a/common/src/main/java/tachyon/thrift/JobConfInfo.java +++ b/common/src/main/java/tachyon/thrift/JobConfInfo.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class JobConfInfo implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("JobConfInfo"); diff --git a/common/src/main/java/tachyon/thrift/LineageFileInfo.java b/common/src/main/java/tachyon/thrift/LineageFileInfo.java index a56e0f9ff720..4c561edff26c 100644 --- a/common/src/main/java/tachyon/thrift/LineageFileInfo.java +++ b/common/src/main/java/tachyon/thrift/LineageFileInfo.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class LineageFileInfo implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("LineageFileInfo"); diff --git a/common/src/main/java/tachyon/thrift/LineageInfo.java b/common/src/main/java/tachyon/thrift/LineageInfo.java index 4ac7392d53e2..fd3a3cc28f0a 100644 --- a/common/src/main/java/tachyon/thrift/LineageInfo.java +++ b/common/src/main/java/tachyon/thrift/LineageInfo.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class LineageInfo implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("LineageInfo"); diff --git a/common/src/main/java/tachyon/thrift/LineageMasterClientService.java b/common/src/main/java/tachyon/thrift/LineageMasterClientService.java index 957ce447d381..e2a7469d47c0 100644 --- a/common/src/main/java/tachyon/thrift/LineageMasterClientService.java +++ b/common/src/main/java/tachyon/thrift/LineageMasterClientService.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class LineageMasterClientService { /** diff --git a/common/src/main/java/tachyon/thrift/MkdirTOptions.java b/common/src/main/java/tachyon/thrift/MkdirTOptions.java index 1b0a3eb81478..f660b5cf5440 100644 --- a/common/src/main/java/tachyon/thrift/MkdirTOptions.java +++ b/common/src/main/java/tachyon/thrift/MkdirTOptions.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class MkdirTOptions implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("MkdirTOptions"); diff --git a/common/src/main/java/tachyon/thrift/NetAddress.java b/common/src/main/java/tachyon/thrift/NetAddress.java index dc19e1890177..f81e802825e4 100644 --- a/common/src/main/java/tachyon/thrift/NetAddress.java +++ b/common/src/main/java/tachyon/thrift/NetAddress.java @@ -37,7 +37,7 @@ /** * Information about workers. */ -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class NetAddress implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("NetAddress"); diff --git a/common/src/main/java/tachyon/thrift/PersistCommand.java b/common/src/main/java/tachyon/thrift/PersistCommand.java index 10dafb58a4ab..bdbc7bc88a43 100644 --- a/common/src/main/java/tachyon/thrift/PersistCommand.java +++ b/common/src/main/java/tachyon/thrift/PersistCommand.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class PersistCommand implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("PersistCommand"); diff --git a/common/src/main/java/tachyon/thrift/PersistFile.java b/common/src/main/java/tachyon/thrift/PersistFile.java index f7d6cf0ed7b7..1a02c013ea1e 100644 --- a/common/src/main/java/tachyon/thrift/PersistFile.java +++ b/common/src/main/java/tachyon/thrift/PersistFile.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class PersistFile implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("PersistFile"); diff --git a/common/src/main/java/tachyon/thrift/RawTableInfo.java b/common/src/main/java/tachyon/thrift/RawTableInfo.java index 4c877b5a6a34..b9a74759b26f 100644 --- a/common/src/main/java/tachyon/thrift/RawTableInfo.java +++ b/common/src/main/java/tachyon/thrift/RawTableInfo.java @@ -37,7 +37,7 @@ /** * Information about raw tables. */ -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class RawTableInfo implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RawTableInfo"); diff --git a/common/src/main/java/tachyon/thrift/RawTableMasterClientService.java b/common/src/main/java/tachyon/thrift/RawTableMasterClientService.java index 7798b7e8cb89..f4f0f505c470 100644 --- a/common/src/main/java/tachyon/thrift/RawTableMasterClientService.java +++ b/common/src/main/java/tachyon/thrift/RawTableMasterClientService.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class RawTableMasterClientService { /** diff --git a/common/src/main/java/tachyon/thrift/RpcOptions.java b/common/src/main/java/tachyon/thrift/RpcOptions.java index 87ca5372407f..b3896e47e978 100644 --- a/common/src/main/java/tachyon/thrift/RpcOptions.java +++ b/common/src/main/java/tachyon/thrift/RpcOptions.java @@ -37,7 +37,7 @@ /** * Information about the RPC. */ -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class RpcOptions implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RpcOptions"); diff --git a/common/src/main/java/tachyon/thrift/SetStateTOptions.java b/common/src/main/java/tachyon/thrift/SetStateTOptions.java index cb8844f7e4b8..efa03dfb682e 100644 --- a/common/src/main/java/tachyon/thrift/SetStateTOptions.java +++ b/common/src/main/java/tachyon/thrift/SetStateTOptions.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class SetStateTOptions implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("SetStateTOptions"); diff --git a/common/src/main/java/tachyon/thrift/TachyonService.java b/common/src/main/java/tachyon/thrift/TachyonService.java index ecc909fc25ce..002cd51598cd 100644 --- a/common/src/main/java/tachyon/thrift/TachyonService.java +++ b/common/src/main/java/tachyon/thrift/TachyonService.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class TachyonService { public interface Iface { diff --git a/common/src/main/java/tachyon/thrift/TachyonTException.java b/common/src/main/java/tachyon/thrift/TachyonTException.java index 083c08c41211..9597bb5072c1 100644 --- a/common/src/main/java/tachyon/thrift/TachyonTException.java +++ b/common/src/main/java/tachyon/thrift/TachyonTException.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class TachyonTException extends TException implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TachyonTException"); diff --git a/common/src/main/java/tachyon/thrift/ThriftIOException.java b/common/src/main/java/tachyon/thrift/ThriftIOException.java index 03293de2601b..80fb2296e37d 100644 --- a/common/src/main/java/tachyon/thrift/ThriftIOException.java +++ b/common/src/main/java/tachyon/thrift/ThriftIOException.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class ThriftIOException extends TException implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ThriftIOException"); diff --git a/common/src/main/java/tachyon/thrift/WorkerInfo.java b/common/src/main/java/tachyon/thrift/WorkerInfo.java index e672b1c94b17..c28200dc5aef 100644 --- a/common/src/main/java/tachyon/thrift/WorkerInfo.java +++ b/common/src/main/java/tachyon/thrift/WorkerInfo.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class WorkerInfo implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("WorkerInfo"); diff --git a/common/src/main/java/tachyon/thrift/WorkerService.java b/common/src/main/java/tachyon/thrift/WorkerService.java index c65441a901d0..2f598947c1e2 100644 --- a/common/src/main/java/tachyon/thrift/WorkerService.java +++ b/common/src/main/java/tachyon/thrift/WorkerService.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-11") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-12") public class WorkerService { public interface Iface extends tachyon.thrift.TachyonService.Iface { diff --git a/common/src/thrift/file_system_master.thrift b/common/src/thrift/file_system_master.thrift index 07eed33b1287..887aec2a7acd 100644 --- a/common/src/thrift/file_system_master.thrift +++ b/common/src/thrift/file_system_master.thrift @@ -198,10 +198,10 @@ service FileSystemMasterWorkerService extends common.TachyonService { set getPinIdList() /** - * Periodic lineage worker heartbeat. Returns the command for checkpointing + * Periodic lineage worker heartbeat. Returns the command for persisting * the blocks of a file. */ - common.Command heartbeat( /** the id of the worker */ 1: i64 workerId, + PersistCommand heartbeat( /** the id of the worker */ 1: i64 workerId, /** the list of persisted files */ 2: list persistedFiles) throws (1: exception.TachyonTException e) } diff --git a/minicluster/src/main/java/tachyon/master/AbstractLocalTachyonCluster.java b/minicluster/src/main/java/tachyon/master/AbstractLocalTachyonCluster.java index 07a33d4b4e9c..d7c4fdddf5f5 100644 --- a/minicluster/src/main/java/tachyon/master/AbstractLocalTachyonCluster.java +++ b/minicluster/src/main/java/tachyon/master/AbstractLocalTachyonCluster.java @@ -39,7 +39,7 @@ import tachyon.worker.WorkerContext; import tachyon.worker.WorkerIdRegistry; import tachyon.worker.block.BlockWorker; -import tachyon.worker.lineage.LineageWorker; +import tachyon.worker.file.FileWorker; /** * Local Tachyon cluster. @@ -60,7 +60,7 @@ public abstract class AbstractLocalTachyonCluster { protected TachyonConf mWorkerConf; protected BlockWorker mWorker; - protected LineageWorker mLineageWorker; + protected FileWorker mLineageWorker; protected UnderFileSystemCluster mUfsCluster; protected String mTachyonHome; @@ -395,7 +395,7 @@ protected void runWorker() throws IOException, ConnectionFailedException { if (LineageUtils.isLineageEnabled(WorkerContext.getConf())) { // Setup the lineage worker LOG.info("Started lineage worker at worker with ID {}", WorkerIdRegistry.getWorkerId()); - mLineageWorker = new LineageWorker(mWorker.getBlockDataManager()); + mLineageWorker = new FileWorker(mWorker.getBlockDataManager()); } Runnable runWorker = new Runnable() { diff --git a/minicluster/src/main/java/tachyon/master/LocalTachyonCluster.java b/minicluster/src/main/java/tachyon/master/LocalTachyonCluster.java index 69968e2dfaeb..f73611c6a4a9 100644 --- a/minicluster/src/main/java/tachyon/master/LocalTachyonCluster.java +++ b/minicluster/src/main/java/tachyon/master/LocalTachyonCluster.java @@ -26,7 +26,7 @@ import tachyon.util.LineageUtils; import tachyon.worker.WorkerContext; import tachyon.worker.block.BlockWorker; -import tachyon.worker.lineage.LineageWorker; +import tachyon.worker.file.FileWorker; /** * Local Tachyon cluster for integration tests. @@ -83,7 +83,7 @@ public BlockWorker getWorker() { return mWorker; } - public LineageWorker getLineageWorker() { + public FileWorker getLineageWorker() { return mLineageWorker; } diff --git a/servers/src/main/java/tachyon/worker/TachyonWorker.java b/servers/src/main/java/tachyon/worker/TachyonWorker.java index f7f9e1ca6461..1c4abe51546f 100644 --- a/servers/src/main/java/tachyon/worker/TachyonWorker.java +++ b/servers/src/main/java/tachyon/worker/TachyonWorker.java @@ -21,7 +21,7 @@ import tachyon.Constants; import tachyon.util.LineageUtils; import tachyon.worker.block.BlockWorker; -import tachyon.worker.lineage.LineageWorker; +import tachyon.worker.file.FileWorker; /** * Entry point for the Tachyon Worker. This class is responsible for initializing the different @@ -39,15 +39,13 @@ public final class TachyonWorker { public static void main(String[] args) { checkArgs(args); BlockWorker worker = null; - LineageWorker lineageWorker = null; + FileWorker fileWorker = null; try { worker = new BlockWorker(); - if (LineageUtils.isLineageEnabled(WorkerContext.getConf())) { - // Setup the lineage worker - LOG.info("Started lineage worker at worker with ID {}", WorkerIdRegistry.getWorkerId()); - lineageWorker = new LineageWorker(worker.getBlockDataManager()); - } + // Setup the file worker + LOG.info("Started file worker at worker with ID {}", WorkerIdRegistry.getWorkerId()); + fileWorker = new FileWorker(worker.getBlockDataManager()); } catch (Exception e) { LOG.error("Failed to initialize the block worker, exiting.", e); @@ -55,10 +53,8 @@ public static void main(String[] args) { } try { - // Start the lineage worker - if (LineageUtils.isLineageEnabled(WorkerContext.getConf())) { - lineageWorker.start(); - } + // Start the file worker + fileWorker.start(); worker.process(); } catch (Exception e) { @@ -66,7 +62,7 @@ public static void main(String[] args) { try { worker.stop(); if (LineageUtils.isLineageEnabled(WorkerContext.getConf())) { - lineageWorker.stop(); + fileWorker.stop(); } } catch (Exception ex) { LOG.error("Failed to stop block worker. Exiting.", ex); diff --git a/servers/src/main/java/tachyon/worker/lineage/LineageDataManager.java b/servers/src/main/java/tachyon/worker/file/FileDataManager.java similarity index 96% rename from servers/src/main/java/tachyon/worker/lineage/LineageDataManager.java rename to servers/src/main/java/tachyon/worker/file/FileDataManager.java index b78a143492cb..388fdc14777b 100644 --- a/servers/src/main/java/tachyon/worker/lineage/LineageDataManager.java +++ b/servers/src/main/java/tachyon/worker/file/FileDataManager.java @@ -13,7 +13,7 @@ * the License. */ -package tachyon.worker.lineage; +package tachyon.worker.file; import java.io.IOException; import java.io.OutputStream; @@ -43,9 +43,9 @@ import tachyon.worker.block.io.BlockReader; /** - * Responsible for managing the lineage storing into under file system. + * Responsible for managing the file storing into under file system. */ -public final class LineageDataManager { +public final class FileDataManager { private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE); private final UnderFileSystem mUfs; @@ -54,7 +54,7 @@ public final class LineageDataManager { private final List mPersistedFiles; private final TachyonConf mTachyonConf; - public LineageDataManager(BlockDataManager blockDataManager) { + public FileDataManager(BlockDataManager blockDataManager) { mBlockDataManager = Preconditions.checkNotNull(blockDataManager); mPersistedFiles = Lists.newArrayList(); mTachyonConf = WorkerContext.getConf(); diff --git a/servers/src/main/java/tachyon/worker/file/FileSystemMasterClient.java b/servers/src/main/java/tachyon/worker/file/FileSystemMasterClient.java index ea092f6d7534..9fc253b3337b 100644 --- a/servers/src/main/java/tachyon/worker/file/FileSystemMasterClient.java +++ b/servers/src/main/java/tachyon/worker/file/FileSystemMasterClient.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.List; import java.util.Set; import org.apache.thrift.TException; @@ -30,6 +31,7 @@ import tachyon.exception.TachyonException; import tachyon.thrift.FileInfo; import tachyon.thrift.FileSystemMasterWorkerService; +import tachyon.thrift.PersistCommand; import tachyon.thrift.TachyonService; /** @@ -101,4 +103,23 @@ public Set call() throws TException { } }); } + + /** + * Instructs a worker to persist the files. + * + * @param workerId the id of the worker that heartbeats + * @param persistedFiles the persisted files + * @return the command for persisting the blocks of a file + * @throws IOException if file persistence fails + * @throws ConnectionFailedException if network connection failed + */ + public synchronized PersistCommand heartbeat(final long workerId, + final List persistedFiles) throws ConnectionFailedException, IOException { + return retryRPC(new RpcCallable() { + @Override + public PersistCommand call() throws TException { + return mClient.heartbeat(workerId, persistedFiles); + } + }); + } } diff --git a/servers/src/main/java/tachyon/worker/lineage/LineageWorker.java b/servers/src/main/java/tachyon/worker/file/FileWorker.java similarity index 70% rename from servers/src/main/java/tachyon/worker/lineage/LineageWorker.java rename to servers/src/main/java/tachyon/worker/file/FileWorker.java index 7cbadf34c73e..dbf5bc923ca8 100644 --- a/servers/src/main/java/tachyon/worker/lineage/LineageWorker.java +++ b/servers/src/main/java/tachyon/worker/file/FileWorker.java @@ -13,7 +13,7 @@ * the License. */ -package tachyon.worker.lineage; +package tachyon.worker.file; import java.io.IOException; import java.util.concurrent.Executors; @@ -34,38 +34,36 @@ import tachyon.worker.block.BlockDataManager; /** - * This class is responsible for managing all top level components of the lineage worker. + * This class is responsible for managing all top level components of the file worker. */ -public final class LineageWorker extends WorkerBase { - /** Logic for managing lineage file persistence */ - private final LineageDataManager mLineageDataManager; - /** Threadpool for the lineage master sync */ - /** Client for lineage master communication. */ - private final LineageMasterClient mLineageMasterWorkerClient; +public final class FileWorker extends WorkerBase { + /** Logic for managing file persistence */ + private final FileDataManager mFileDataManager; + /** Client for file system master communication. */ + private final FileSystemMasterClient mFileSystemMasterWorkerClient; /** Configuration object */ private final TachyonConf mTachyonConf; - /** The service that persists files for lineage checkpointing */ + /** The service that persists files */ private Future mFilePersistenceService; - public LineageWorker(BlockDataManager blockDataManager) throws IOException { - super(Executors.newFixedThreadPool(3, ThreadFactoryUtils.build("lineage-worker-heartbeat-%d", - true))); + public FileWorker(BlockDataManager blockDataManager) throws IOException { + super(Executors.newFixedThreadPool(3, + ThreadFactoryUtils.build("file-worker-heartbeat-%d", true))); Preconditions.checkState(WorkerIdRegistry.getWorkerId() != 0, "Failed to register worker"); mTachyonConf = WorkerContext.getConf(); - mLineageDataManager = - new LineageDataManager(Preconditions.checkNotNull(blockDataManager)); + mFileDataManager = new FileDataManager(Preconditions.checkNotNull(blockDataManager)); // Setup MasterClientBase - mLineageMasterWorkerClient = new LineageMasterClient( + mFileSystemMasterWorkerClient = new FileSystemMasterClient( NetworkAddressUtils.getConnectAddress(ServiceType.MASTER_RPC, mTachyonConf), mTachyonConf); } public void start() { mFilePersistenceService = getExecutorService().submit(new HeartbeatThread(HeartbeatContext.WORKER_LINEAGE_SYNC, - new LineageWorkerMasterSyncExecutor(mLineageDataManager, mLineageMasterWorkerClient), + new FileWorkerMasterSyncExecutor(mFileDataManager, mFileSystemMasterWorkerClient), mTachyonConf.getInt(Constants.WORKER_LINEAGE_HEARTBEAT_INTERVAL_MS))); } @@ -73,7 +71,7 @@ public void stop() { if (mFilePersistenceService != null) { mFilePersistenceService.cancel(true); } - mLineageMasterWorkerClient.close(); + mFileSystemMasterWorkerClient.close(); getExecutorService().shutdown(); } } diff --git a/servers/src/main/java/tachyon/worker/lineage/LineageWorkerMasterSyncExecutor.java b/servers/src/main/java/tachyon/worker/file/FileWorkerMasterSyncExecutor.java similarity index 68% rename from servers/src/main/java/tachyon/worker/lineage/LineageWorkerMasterSyncExecutor.java rename to servers/src/main/java/tachyon/worker/file/FileWorkerMasterSyncExecutor.java index 1db21f310549..fa0971d25706 100644 --- a/servers/src/main/java/tachyon/worker/lineage/LineageWorkerMasterSyncExecutor.java +++ b/servers/src/main/java/tachyon/worker/file/FileWorkerMasterSyncExecutor.java @@ -13,7 +13,7 @@ * the License. */ -package tachyon.worker.lineage; +package tachyon.worker.file; import java.io.IOException; import java.util.List; @@ -28,50 +28,50 @@ import tachyon.Constants; import tachyon.exception.ConnectionFailedException; import tachyon.heartbeat.HeartbeatExecutor; -import tachyon.thrift.CheckpointFile; import tachyon.thrift.CommandType; -import tachyon.thrift.LineageCommand; +import tachyon.thrift.PersistCommand; +import tachyon.thrift.PersistFile; import tachyon.worker.WorkerIdRegistry; import tachyon.worker.block.BlockMasterSync; /** - * Class that communicates to lineage master via heartbeat. This class manages its own - * {@link LineageMasterClient}. + * Class that communicates to file system master via heartbeat. This class manages its own + * {@link FileSystemMasterClient}. * - * When running, this class pulls from the master to check which file to persist for lineage - * checkpointing. + * When running, this class pulls from the master to check which file to persist for async + * persistence. * * If the task fails to heartbeat to the master, it will destroy its old master client and recreate * it before retrying. * * TODO(yupeng): merge this with {@link BlockMasterSync} to use a central command pattern. */ -final class LineageWorkerMasterSyncExecutor implements HeartbeatExecutor { +final class FileWorkerMasterSyncExecutor implements HeartbeatExecutor { private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE); private static final int DEFAULT_FILE_PERSISTER_POOL_SIZE = 10; - /** Logic for managing lineage file persistence */ - private final LineageDataManager mLineageDataManager; - /** Client for communicating to lineage master */ - private final LineageMasterClient mMasterClient; + /** Logic for managing async file persistence */ + private final FileDataManager mFileDataManager; + /** Client for communicating to file system master */ + private final FileSystemMasterClient mMasterClient; /** The thread pool to persist file */ private final ExecutorService mFixedExecutionService = Executors.newFixedThreadPool(DEFAULT_FILE_PERSISTER_POOL_SIZE); - public LineageWorkerMasterSyncExecutor(LineageDataManager lineageDataManager, - LineageMasterClient masterClient) { - mLineageDataManager = Preconditions.checkNotNull(lineageDataManager); + public FileWorkerMasterSyncExecutor(FileDataManager fileDataManager, + FileSystemMasterClient masterClient) { + mFileDataManager = Preconditions.checkNotNull(fileDataManager); mMasterClient = Preconditions.checkNotNull(masterClient); } @Override public void heartbeat() { - List persistedFiles = mLineageDataManager.popPersistedFiles(); + List persistedFiles = mFileDataManager.popPersistedFiles(); if (!persistedFiles.isEmpty()) { LOG.info("files {} persisted", persistedFiles); } - LineageCommand command = null; + PersistCommand command = null; try { command = mMasterClient.heartbeat(WorkerIdRegistry.getWorkerId(), persistedFiles); @@ -82,9 +82,9 @@ public void heartbeat() { } Preconditions.checkState(command.commandType == CommandType.Persist); - for (CheckpointFile checkpointFile : command.checkpointFiles) { - mFixedExecutionService.execute(new FilePersister(mLineageDataManager, checkpointFile.fileId, - checkpointFile.blockIds)); + for (PersistFile persistFile : command.persistFiles) { + mFixedExecutionService.execute(new FilePersister(mFileDataManager, persistFile.fileId, + persistFile.blockIds)); } } @@ -92,12 +92,12 @@ public void heartbeat() { * Thread to persist a file into under file system. */ class FilePersister implements Runnable { - private LineageDataManager mLineageDataManager; + private FileDataManager mFileDataManager; private long mFileId; private List mBlockIds; - public FilePersister(LineageDataManager lineageDataManager, long fileId, List blockIds) { - mLineageDataManager = lineageDataManager; + public FilePersister(FileDataManager fileDataManager, long fileId, List blockIds) { + mFileDataManager = fileDataManager; mFileId = fileId; mBlockIds = blockIds; } @@ -106,7 +106,7 @@ public FilePersister(LineageDataManager lineageDataManager, long fileId, List persistedFiles) throws ConnectionFailedException, IOException { - return retryRPC(new RpcCallable() { - @Override - public LineageCommand call() throws TException { - return mClient.heartbeat(workerId, persistedFiles); - } - }); - } -} diff --git a/servers/src/test/java/tachyon/worker/lineage/LineageDataManagerTest.java b/servers/src/test/java/tachyon/worker/lineage/LineageDataManagerTest.java index 3e444564a166..3a5f93873376 100644 --- a/servers/src/test/java/tachyon/worker/lineage/LineageDataManagerTest.java +++ b/servers/src/test/java/tachyon/worker/lineage/LineageDataManagerTest.java @@ -40,9 +40,10 @@ import tachyon.util.io.PathUtils; import tachyon.worker.block.BlockDataManager; import tachyon.worker.block.io.BlockReader; +import tachyon.worker.file.FileDataManager; /** - * Tests {@link LineageDataManager}. + * Tests {@link FileDataManager}. */ @RunWith(PowerMockRunner.class) @PrepareForTest({BlockDataManager.class, BufferUtils.class}) @@ -68,7 +69,7 @@ public void persistFileTest() throws Exception { .thenReturn(reader); } - LineageDataManager manager = new LineageDataManager(blockDataManager); + FileDataManager manager = new FileDataManager(blockDataManager); // mock ufs UnderFileSystem ufs = Mockito.mock(UnderFileSystem.class); @@ -99,7 +100,7 @@ public void persistFileTest() throws Exception { @SuppressWarnings("unchecked") public void popPersistedFilesTest() { BlockDataManager blockDataManager = Mockito.mock(BlockDataManager.class); - LineageDataManager manager = new LineageDataManager(blockDataManager); + FileDataManager manager = new FileDataManager(blockDataManager); List persistedFiles = Lists.newArrayList(1L, 2L); Whitebox.setInternalState(manager, "mPersistedFiles", Lists.newArrayList(persistedFiles));