diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SwapBlockListOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSwapBlockListOp.java
similarity index 87%
rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SwapBlockListOp.java
rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSwapBlockListOp.java
index 7c02fbb204ed5..bd459836f6245 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SwapBlockListOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSwapBlockListOp.java
@@ -22,6 +22,7 @@
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
@@ -35,13 +36,14 @@
* of a file header, which is useful for client operations like converting
* replicated to EC file.
*/
-public final class SwapBlockListOp {
+public final class FSDirSwapBlockListOp {
- private SwapBlockListOp() {
+ private FSDirSwapBlockListOp() {
}
static SwapBlockListResult swapBlocks(FSDirectory fsd, FSPermissionChecker pc,
- String src, String dst, long genTimestamp)
+ String src, String dst, long genTimestamp,
+ final boolean logRetryCache)
throws IOException {
final INodesInPath srcIIP = fsd.resolvePath(pc, src, DirOp.WRITE);
@@ -61,6 +63,8 @@ static SwapBlockListResult swapBlocks(FSDirectory fsd, FSPermissionChecker pc,
} finally {
fsd.writeUnlock();
}
+ fsd.getEditLog().logSwapBlockList(srcIIP.getPath(), dstIIP.getPath(),
+ genTimestamp, logRetryCache);
return result;
}
@@ -155,6 +159,25 @@ private static void validateInode(INodesInPath srcIIP)
}
}
+ /**
+ * swapBlockList src to dst.
+ *
+ * Note: This is to be used by {@link org.apache.hadoop.hdfs.server
+ * .namenode.FSEditLogLoader} only.
+ *
+ * @param fsd FSDirectory
+ * @param src source path
+ * @param dst destination path
+ * @param timestamp modification time
+ */
+ static void swapBlockListForEditLog(
+ FSDirectory fsd, String src, String dst, long timestamp)
+ throws IOException {
+ final INodesInPath srcIIP = fsd.getINodesInPath(src, DirOp.WRITE);
+ final INodesInPath dstIIP = fsd.getINodesInPath(dst, DirOp.WRITE);
+ swapBlockList(fsd, srcIIP, dstIIP, timestamp);
+ }
+
static class SwapBlockListResult {
private final boolean success;
private final FileStatus srcFileAuditStat;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index be8c684f01508..92863e6ecf494 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -91,6 +91,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SwapBlockListOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
@@ -1295,6 +1296,21 @@ void logRemoveErasureCodingPolicy(String ecPolicyName, boolean toLogRpcIds) {
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
+
+ /**
+ * Add SwapBlockList record to edit log.
+ *
+ */
+ void logSwapBlockList(String src, String dst, long timestamp,
+ boolean toLogRpcIds) {
+ SwapBlockListOp op = SwapBlockListOp.getInstance(cache.get())
+ .setSource(src)
+ .setDestination(dst)
+ .setTimestamp(timestamp);
+ logRpcIds(op, toLogRpcIds);
+ logEdit(op);
+ }
+
/**
* Get all the journals this edit log is currently operating on.
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 294296d2d36d5..1bf28a5d4eedb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -90,6 +90,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SwapBlockListOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
@@ -1049,6 +1050,14 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
}
break;
+ case OP_SWAP_BLOCK_LIST:
+ SwapBlockListOp swapOp = (SwapBlockListOp) op;
+ FSDirSwapBlockListOp.swapBlockListForEditLog(fsDir, swapOp.src,
+ swapOp.dst, swapOp.timestamp);
+ if (toAddRetryCache) {
+ fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
+ }
+ break;
default:
throw new IOException("Invalid operation read " + op.opCode);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
index 963628f9ac4e3..f65b9c3f5e2dc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
@@ -17,59 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_ERASURE_CODING_POLICY;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_APPEND;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_BLOCK;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_DIRECTIVE;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_POOL;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOCATE_BLOCK_ID;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOW_SNAPSHOT;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CANCEL_DELEGATION_TOKEN;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CLEAR_NS_QUOTA;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CLOSE;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CONCAT_DELETE;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CREATE_SNAPSHOT;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DELETE;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DELETE_SNAPSHOT;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DISABLE_ERASURE_CODING_POLICY;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DISALLOW_SNAPSHOT;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ENABLE_ERASURE_CODING_POLICY;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_END_LOG_SEGMENT;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_GET_DELEGATION_TOKEN;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_INVALID;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MKDIR;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_CACHE_DIRECTIVE;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_CACHE_POOL;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REASSIGN_LEASE;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_DIRECTIVE;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_POOL;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_ERASURE_CODING_POLICY;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_XATTR;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_SNAPSHOT;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENEW_DELEGATION_TOKEN;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ROLLING_UPGRADE_FINALIZE;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ROLLING_UPGRADE_START;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_ACL;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_GENSTAMP_V1;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_GENSTAMP_V2;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_NS_QUOTA;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_OWNER;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_PERMISSIONS;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_QUOTA;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_REPLICATION;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_XATTR;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_START_LOG_SEGMENT;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SYMLINK;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_TIMES;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_TRUNCATE;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_BLOCKS;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_MASTER_KEY;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_STORAGE_POLICY;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_QUOTA_BY_STORAGETYPE;
-
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
@@ -143,6 +90,8 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
+
/**
* Helper classes for reading the ops from an InputStream.
* All ops derive from FSEditLogOp and are only
@@ -4910,6 +4859,105 @@ static RollingUpgradeFinalizeOp getInstance(OpInstanceCache cache) {
}
}
+ /** {@literal @Idempotent} for {@link ClientProtocol#swapBlockList} */
+ static class SwapBlockListOp extends FSEditLogOp {
+ String src;
+ String dst;
+ long timestamp;
+
+ SwapBlockListOp() {
+ super(OP_SWAP_BLOCK_LIST);
+ }
+
+ static SwapBlockListOp getInstance(OpInstanceCache cache) {
+ return (SwapBlockListOp) cache.get(OP_SWAP_BLOCK_LIST);
+ }
+
+ @Override
+ void resetSubFields() {
+ src = null;
+ dst = null;
+ timestamp = 0L;
+ }
+
+ SwapBlockListOp setSource(String src) {
+ this.src = src;
+ return this;
+ }
+
+ SwapBlockListOp setDestination(String dst) {
+ this.dst = dst;
+ return this;
+ }
+
+ SwapBlockListOp setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ @Override
+ public
+ void writeFields(DataOutputStream out) throws IOException {
+ FSImageSerialization.writeString(src, out);
+ FSImageSerialization.writeString(dst, out);
+ FSImageSerialization.writeLong(timestamp, out);
+ writeRpcIds(rpcClientId, rpcCallId, out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+ this.src = FSImageSerialization.readString(in);
+ this.dst = FSImageSerialization.readString(in);
+
+ if (NameNodeLayoutVersion.supports(
+ LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+ this.timestamp = FSImageSerialization.readLong(in);
+ } else {
+ this.timestamp = readLong(in);
+ }
+ // read RPC ids if necessary
+ readRpcIds(in, logVersion);
+ }
+
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("SwapBlockListOp [src=")
+ .append(src)
+ .append(", dst=")
+ .append(dst)
+ .append(", timestamp=")
+ .append(timestamp);
+ appendRpcIdsToString(builder, rpcClientId, rpcCallId);
+ builder.append(", opCode=")
+ .append(opCode)
+ .append(", txid=")
+ .append(txid)
+ .append("]");
+ return builder.toString();
+ }
+
+ @Override
+ protected void toXml(ContentHandler contentHandler) throws SAXException {
+ XMLUtils.addSaxString(contentHandler, "SRC", src);
+ XMLUtils.addSaxString(contentHandler, "DST", dst);
+ XMLUtils.addSaxString(contentHandler, "TIMESTAMP",
+ Long.toString(timestamp));
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
+ }
+
+ @Override void fromXml(Stanza st) throws InvalidXmlException {
+ this.src = st.getValue("SRC");
+ this.dst = st.getValue("DST");
+ this.timestamp = Long.parseLong(st.getValue("TIMESTAMP"));
+ String opts = st.getValue("OPTIONS");
+ String o[] = opts.split("\\|");
+ readRpcIdsFromXml(st);
+ }
+ }
+
/**
* Class for writing editlog ops
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
index ce42e3faffe94..fc642339b28bc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
@@ -85,6 +85,7 @@ public enum FSEditLogOpCodes {
OP_DISABLE_ERASURE_CODING_POLICY((byte) 51,
DisableErasureCodingPolicyOp.class),
OP_REMOVE_ERASURE_CODING_POLICY((byte) 52, RemoveErasureCodingPolicyOp.class),
+ OP_SWAP_BLOCK_LIST((byte) 53, SwapBlockListOp.class),
// Note that the current range of the valid OP code is 0~127
OP_INVALID ((byte) -1);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index cd3ec885b4550..78f5bb836d399 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -112,7 +112,7 @@
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.server.common.ECTopologyVerifier;
-import org.apache.hadoop.hdfs.server.namenode.SwapBlockListOp.SwapBlockListResult;
+import org.apache.hadoop.hdfs.server.namenode.FSDirSwapBlockListOp.SwapBlockListResult;
import org.apache.hadoop.hdfs.server.namenode.metrics.ReplicatedBlocksMBean;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
@@ -135,7 +135,6 @@
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.file.Files;
-import java.security.GeneralSecurityException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
@@ -8507,9 +8506,11 @@ public void checkErasureCodingSupported(String operationName)
* @param dst destination file.
* @throws IOException on Error.
*/
- boolean swapBlockList(final String src, final String dst, long genTimestamp)
+ boolean swapBlockList(final String src, final String dst, long genTimestamp,
+ boolean logRetryCache)
throws IOException {
final String operationName = "swapBlockList";
+ requireEffectiveLayoutVersionForFeature(Feature.SWAP_BLOCK_LIST);
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
SwapBlockListResult res = null;
@@ -8518,7 +8519,8 @@ boolean swapBlockList(final String src, final String dst, long genTimestamp)
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot swap block list." + src + ", " + dst);
- res = SwapBlockListOp.swapBlocks(dir, pc, src, dst, genTimestamp);
+ res = FSDirSwapBlockListOp.swapBlocks(dir, pc, src, dst, genTimestamp,
+ logRetryCache);
} finally {
writeUnlock(operationName);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
index 297ca74c5e111..198fd8dbfa011 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
@@ -89,7 +89,8 @@ public enum Feature implements LayoutFeature {
APPEND_NEW_BLOCK(-62, -61, "Support appending to new block"),
QUOTA_BY_STORAGE_TYPE(-63, -61, "Support quota for specific storage types"),
ERASURE_CODING(-64, -61, "Support erasure coding"),
- EXPANDED_STRING_TABLE(-65, -61, "Support expanded string table in fsimage");
+ EXPANDED_STRING_TABLE(-65, -61, "Support expanded string table in fsimage"),
+ SWAP_BLOCK_LIST(-66, -66, "Support swapBlockList Op");
private final FeatureInfo info;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 7e480c5fd250e..6ef7d69775201 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -2674,7 +2674,18 @@ public boolean swapBlockList(String src, String dst, long maxTimestamp)
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.swapBlockList: {} and {}", src, dst);
}
- return namesystem.swapBlockList(src, dst, maxTimestamp);
+ final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return true;
+ }
+ boolean ret = false;
+ try {
+ ret =
+ namesystem.swapBlockList(src, dst, maxTimestamp, cacheEntry != null);
+ } finally {
+ RetryCache.setState(cacheEntry, ret);
+ }
+ return ret;
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSwapBlockList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSwapBlockList.java
index d29fdee37969a..4a482ea3c743a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSwapBlockList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSwapBlockList.java
@@ -18,25 +18,27 @@
package org.apache.hadoop.hdfs.server.namenode;
-import static org.junit.Assert.assertEquals;
-
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.namenode.INodeFile.HeaderFormat;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import static org.junit.Assert.*;
+
/**
* Test SwapBlockListOp working.
*/
@@ -57,6 +59,9 @@ public class TestSwapBlockList {
private final Path subDir2 = new Path(rootDir, "dir2");
private final Path file4 = new Path(subDir2, "file4");
+ private final Path subDir3 = new Path(rootDir, "dir3");
+ private final Path file5 = new Path(subDir3, "file5");
+
private Configuration conf;
private MiniDFSCluster cluster;
private FSNamesystem fsn;
@@ -69,7 +74,7 @@ public void setUp() throws Exception {
conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_XATTRS_PER_INODE_KEY, 2);
cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(REPLICATION)
+ .numDataNodes(9)
.build();
cluster.waitActive();
@@ -79,12 +84,18 @@ public void setUp() throws Exception {
hdfs = cluster.getFileSystem();
hdfs.mkdirs(subDir2);
+ hdfs.mkdirs(subDir3);
DFSTestUtil.createFile(hdfs, file1, 1024, REPLICATION, SEED);
DFSTestUtil.createFile(hdfs, file2, 1024, REPLICATION, SEED);
DFSTestUtil.createFile(hdfs, file3, 1024, REPLICATION, SEED);
DFSTestUtil.createFile(hdfs, file4, 1024, REPLICATION, SEED);
-
+ fsn.enableErasureCodingPolicy(
+ StripedFileTestUtil.getDefaultECPolicy().getName(), false);
+ hdfs.getClient().setErasureCodingPolicy(subDir3.toString(),
+ StripedFileTestUtil.getDefaultECPolicy().getName());
+ DFSTestUtil.createStripedFile(cluster, file5, null,
+ 4, 4, false);
}
@After
@@ -101,19 +112,19 @@ public void testInputValidation() throws Exception {
LambdaTestUtils.intercept(FileNotFoundException.class,
"/TestSwapBlockList/dir1/fileXYZ", () -> fsn.swapBlockList(
"/TestSwapBlockList/dir1/fileXYZ", "/TestSwapBlockList/dir1/dir11" +
- "/file3", 0L));
+ "/file3", 0L, false));
// Destination file not found.
LambdaTestUtils.intercept(FileNotFoundException.class,
"/TestSwapBlockList/dir1/dir11/fileXYZ",
() -> fsn.swapBlockList("/TestSwapBlockList/dir1/file1",
- "/TestSwapBlockList/dir1/dir11/fileXYZ", 0L));
+ "/TestSwapBlockList/dir1/dir11/fileXYZ", 0L, false));
// Source is Directory, not a file.
LambdaTestUtils.intercept(IOException.class,
"/TestSwapBlockList/dir1 is not a file.",
() -> fsn.swapBlockList("/TestSwapBlockList/dir1",
- "/TestSwapBlockList/dir1/dir11/file3", 0L));
+ "/TestSwapBlockList/dir1/dir11/file3", 0L, false));
String sourceFile = "/TestSwapBlockList/dir1/file1";
String dstFile1 = "/TestSwapBlockList/dir1/dir11/file3";
@@ -124,14 +135,14 @@ public void testInputValidation() throws Exception {
dstInodeFile.toUnderConstruction("TestClient", "TestClientMachine");
LambdaTestUtils.intercept(IOException.class,
dstFile1 + " is under construction.",
- () -> fsn.swapBlockList(sourceFile, dstFile1, 0L));
+ () -> fsn.swapBlockList(sourceFile, dstFile1, 0L, false));
// Check if parent directory is in snapshot.
SnapshotTestHelper.createSnapshot(hdfs, subDir2, "s0");
String dstFile2 = "/TestSwapBlockList/dir2/file4";
LambdaTestUtils.intercept(IOException.class,
dstFile2 + " is in a snapshot directory",
- () -> fsn.swapBlockList(sourceFile, dstFile2, 0L));
+ () -> fsn.swapBlockList(sourceFile, dstFile2, 0L, false));
// Check if gen timestamp validation works.
String dstFile3 = "/TestSwapBlockList/dir1/file2";
@@ -141,7 +152,7 @@ public void testInputValidation() throws Exception {
dstInodeFile.getLastBlock().setGenerationStamp(genStamp + 1);
LambdaTestUtils.intercept(IOException.class,
dstFile3 + " has last block with different gen timestamp.",
- () -> fsn.swapBlockList(sourceFile, dstFile3, genStamp));
+ () -> fsn.swapBlockList(sourceFile, dstFile3, genStamp, false));
}
@Test
@@ -163,7 +174,7 @@ public void testSwapBlockListOp() throws Exception {
long dstHeader = dstInodeFile.getHeaderLong();
fsn.swapBlockList(sourceFile, dstFile,
- dstInodeFile.getLastBlock().getGenerationStamp());
+ dstInodeFile.getLastBlock().getGenerationStamp(), false);
assertBlockListEquality(dstBlockLocationsBeforeSwap,
srcInodeFile.getBlocks(), srcInodeFile.getId());
assertBlockListEquality(srcBlockLocationsBeforeSwap,
@@ -217,6 +228,113 @@ public void testSwapBlockListOpRollback() throws Exception {
HeaderFormat.getBlockLayoutPolicy(dstInodeFile.getHeaderLong()));
}
+ @Test
+ public void testSwapBlockListEditLog() throws Exception{
+ // start a cluster
+ Configuration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(9)
+ .build();
+ cluster.waitActive();
+ DistributedFileSystem fs = cluster.getFileSystem();
+ FSNamesystem fns = cluster.getNamesystem();
+ FSDirectory fsd = fns.getFSDirectory();
+ ErasureCodingPolicy testECPolicy = StripedFileTestUtil.getDefaultECPolicy();
+ fns.enableErasureCodingPolicy(testECPolicy.getName(), false);
+
+ final Path rootDir = new Path("/" + getClass().getSimpleName());
+ Path srcRepDir = new Path (rootDir,"dir_replica");
+ Path dstECDir = new Path (rootDir,"dir_ec");
+ Path srcFile = new Path(srcRepDir, "file_1");
+ Path dstFile = new Path(dstECDir, "file_2");
+
+ fs.mkdirs(srcRepDir);
+ fs.mkdirs(dstECDir);
+
+ fs.getClient().setErasureCodingPolicy(dstECDir.toString(),
+ testECPolicy.getName());
+
+ DFSTestUtil.createFile(fs, srcFile, 1024, (short) 3, 1);
+
+ DFSTestUtil.createStripedFile(cluster, dstFile, null,
+ 4, 4, false, testECPolicy);
+
+ INodeFile srcInodeFile =
+ (INodeFile) fsd.resolvePath(fsd.getPermissionChecker(),
+ srcFile.toString(), FSDirectory.DirOp.WRITE).getLastINode();
+ INodeFile dstInodeFile =
+ (INodeFile) fsd.resolvePath(fsd.getPermissionChecker(),
+ dstFile.toString(), FSDirectory.DirOp.WRITE).getLastINode();
+
+ BlockInfo[] srcBlockLocationsBeforeSwap = srcInodeFile.getBlocks();
+ long srcHeader = srcInodeFile.getHeaderLong();
+
+ BlockInfo[] dstBlockLocationsBeforeSwap = dstInodeFile.getBlocks();
+ long dstHeader = dstInodeFile.getHeaderLong();
+
+ // swapBlockList srcRepFile dstECFile
+ fns.swapBlockList(srcFile.toString(), dstFile.toString(),
+ dstInodeFile.getLastBlock().getGenerationStamp(), false);
+
+ // Assert Block Id
+ assertBlockListEquality(dstBlockLocationsBeforeSwap,
+ srcInodeFile.getBlocks(), srcInodeFile.getId());
+ assertBlockListEquality(srcBlockLocationsBeforeSwap,
+ dstInodeFile.getBlocks(), dstInodeFile.getId());
+
+ // Assert Block Layout
+ assertEquals(HeaderFormat.getBlockLayoutPolicy(srcHeader),
+ HeaderFormat.getBlockLayoutPolicy(dstInodeFile.getHeaderLong()));
+ assertEquals(HeaderFormat.getBlockLayoutPolicy(dstHeader),
+ HeaderFormat.getBlockLayoutPolicy(srcInodeFile.getHeaderLong()));
+
+ // Assert Storage policy
+ assertEquals(HeaderFormat.getStoragePolicyID(srcHeader),
+ HeaderFormat.getStoragePolicyID(dstInodeFile.getHeaderLong()));
+ assertEquals(HeaderFormat.getStoragePolicyID(dstHeader),
+ HeaderFormat.getStoragePolicyID(srcInodeFile.getHeaderLong()));
+
+ //After the namenode restarts if the block by loaded is the same as above
+ //(new block size and timestamp) it means that we have successfully
+ //applied the edit log to the fsimage.
+ cluster.restartNameNodes();
+ cluster.waitActive();
+ fns = cluster.getNamesystem();
+
+ INodeFile srcInodeLoaded = (INodeFile)fns.getFSDirectory()
+ .getINode(srcFile.toString());
+
+ INodeFile dstInodeLoaded = (INodeFile)fns.getFSDirectory()
+ .getINode(dstFile.toString());
+
+ // Assert Block Id
+ assertBlockListEquality(dstBlockLocationsBeforeSwap,
+ srcInodeLoaded.getBlocks(), srcInodeLoaded.getId());
+ assertBlockListEquality(srcBlockLocationsBeforeSwap,
+ dstInodeLoaded.getBlocks(), dstInodeLoaded.getId());
+
+ // Assert Block Layout
+ assertEquals(HeaderFormat.getBlockLayoutPolicy(srcHeader),
+ HeaderFormat.getBlockLayoutPolicy(dstInodeLoaded.getHeaderLong()));
+ assertEquals(HeaderFormat.getBlockLayoutPolicy(dstHeader),
+ HeaderFormat.getBlockLayoutPolicy(srcInodeLoaded.getHeaderLong()));
+
+ // Assert Storage policy
+ assertEquals(HeaderFormat.getStoragePolicyID(srcHeader),
+ HeaderFormat.getStoragePolicyID(dstInodeLoaded.getHeaderLong()));
+ assertEquals(HeaderFormat.getStoragePolicyID(dstHeader),
+ HeaderFormat.getStoragePolicyID(srcInodeLoaded.getHeaderLong()));
+
+ cluster.shutdown();
+ cluster = null;
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
private void assertBlockListEquality(BlockInfo[] expected,
BlockInfo[] actual,
long expectedId) {