Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
Expand All @@ -35,13 +36,14 @@
* of a file header, which is useful for client operations like converting
* replicated to EC file.
*/
public final class SwapBlockListOp {
public final class FSDirSwapBlockListOp {

private SwapBlockListOp() {
private FSDirSwapBlockListOp() {
}

static SwapBlockListResult swapBlocks(FSDirectory fsd, FSPermissionChecker pc,
String src, String dst, long genTimestamp)
String src, String dst, long genTimestamp,
final boolean logRetryCache)
throws IOException {

final INodesInPath srcIIP = fsd.resolvePath(pc, src, DirOp.WRITE);
Expand All @@ -61,6 +63,8 @@ static SwapBlockListResult swapBlocks(FSDirectory fsd, FSPermissionChecker pc,
} finally {
fsd.writeUnlock();
}
fsd.getEditLog().logSwapBlockList(srcIIP.getPath(), dstIIP.getPath(),
genTimestamp, logRetryCache);
return result;
}

Expand Down Expand Up @@ -155,6 +159,25 @@ private static void validateInode(INodesInPath srcIIP)
}
}

/**
* swapBlockList src to dst.
* <br>
* Note: This is to be used by {@link org.apache.hadoop.hdfs.server
* .namenode.FSEditLogLoader} only.
* <br>
* @param fsd FSDirectory
* @param src source path
* @param dst destination path
* @param timestamp modification time
*/
static void swapBlockListForEditLog(
FSDirectory fsd, String src, String dst, long timestamp)
throws IOException {
final INodesInPath srcIIP = fsd.getINodesInPath(src, DirOp.WRITE);
final INodesInPath dstIIP = fsd.getINodesInPath(dst, DirOp.WRITE);
swapBlockList(fsd, srcIIP, dstIIP, timestamp);
}

static class SwapBlockListResult {
private final boolean success;
private final FileStatus srcFileAuditStat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SwapBlockListOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
Expand Down Expand Up @@ -1295,6 +1296,21 @@ void logRemoveErasureCodingPolicy(String ecPolicyName, boolean toLogRpcIds) {
logRpcIds(op, toLogRpcIds);
logEdit(op);
}

/**
* Add SwapBlockList record to edit log.
*
*/
void logSwapBlockList(String src, String dst, long timestamp,
boolean toLogRpcIds) {
SwapBlockListOp op = SwapBlockListOp.getInstance(cache.get())
.setSource(src)
.setDestination(dst)
.setTimestamp(timestamp);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}

/**
* Get all the journals this edit log is currently operating on.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SwapBlockListOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
Expand Down Expand Up @@ -1049,6 +1050,14 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
}
break;
case OP_SWAP_BLOCK_LIST:
SwapBlockListOp swapOp = (SwapBlockListOp) op;
FSDirSwapBlockListOp.swapBlockListForEditLog(fsDir, swapOp.src,
swapOp.dst, swapOp.timestamp);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
}
break;
default:
throw new IOException("Invalid operation read " + op.opCode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,59 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.namenode;

import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_ERASURE_CODING_POLICY;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_APPEND;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_BLOCK;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_DIRECTIVE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_POOL;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOCATE_BLOCK_ID;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOW_SNAPSHOT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CANCEL_DELEGATION_TOKEN;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CLEAR_NS_QUOTA;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CLOSE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CONCAT_DELETE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CREATE_SNAPSHOT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DELETE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DELETE_SNAPSHOT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DISABLE_ERASURE_CODING_POLICY;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DISALLOW_SNAPSHOT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ENABLE_ERASURE_CODING_POLICY;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_END_LOG_SEGMENT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_GET_DELEGATION_TOKEN;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_INVALID;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MKDIR;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_CACHE_DIRECTIVE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_CACHE_POOL;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REASSIGN_LEASE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_DIRECTIVE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_POOL;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_ERASURE_CODING_POLICY;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_XATTR;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_SNAPSHOT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENEW_DELEGATION_TOKEN;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ROLLING_UPGRADE_FINALIZE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ROLLING_UPGRADE_START;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_ACL;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_GENSTAMP_V1;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_GENSTAMP_V2;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_NS_QUOTA;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_OWNER;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_PERMISSIONS;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_QUOTA;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_REPLICATION;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_XATTR;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_START_LOG_SEGMENT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SYMLINK;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_TIMES;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_TRUNCATE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_BLOCKS;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_MASTER_KEY;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_STORAGE_POLICY;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_QUOTA_BY_STORAGETYPE;

import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
Expand Down Expand Up @@ -143,6 +90,8 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;

import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;

/**
* Helper classes for reading the ops from an InputStream.
* All ops derive from FSEditLogOp and are only
Expand Down Expand Up @@ -4910,6 +4859,105 @@ static RollingUpgradeFinalizeOp getInstance(OpInstanceCache cache) {
}
}

/** {@literal @Idempotent} for {@link ClientProtocol#swapBlockList} */
static class SwapBlockListOp extends FSEditLogOp {
String src;
String dst;
long timestamp;

SwapBlockListOp() {
super(OP_SWAP_BLOCK_LIST);
}

static SwapBlockListOp getInstance(OpInstanceCache cache) {
return (SwapBlockListOp) cache.get(OP_SWAP_BLOCK_LIST);
}

@Override
void resetSubFields() {
src = null;
dst = null;
timestamp = 0L;
}

SwapBlockListOp setSource(String src) {
this.src = src;
return this;
}

SwapBlockListOp setDestination(String dst) {
this.dst = dst;
return this;
}

SwapBlockListOp setTimestamp(long timestamp) {
this.timestamp = timestamp;
return this;
}

@Override
public
void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(src, out);
FSImageSerialization.writeString(dst, out);
FSImageSerialization.writeLong(timestamp, out);
writeRpcIds(rpcClientId, rpcCallId, out);
}

@Override
void readFields(DataInputStream in, int logVersion)
throws IOException {
this.src = FSImageSerialization.readString(in);
this.dst = FSImageSerialization.readString(in);

if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
this.timestamp = FSImageSerialization.readLong(in);
} else {
this.timestamp = readLong(in);
}
// read RPC ids if necessary
readRpcIds(in, logVersion);
}


@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("SwapBlockListOp [src=")
.append(src)
.append(", dst=")
.append(dst)
.append(", timestamp=")
.append(timestamp);
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append(", opCode=")
.append(opCode)
.append(", txid=")
.append(txid)
.append("]");
return builder.toString();
}

@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "SRC", src);
XMLUtils.addSaxString(contentHandler, "DST", dst);
XMLUtils.addSaxString(contentHandler, "TIMESTAMP",
Long.toString(timestamp));
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}

@Override void fromXml(Stanza st) throws InvalidXmlException {
this.src = st.getValue("SRC");
this.dst = st.getValue("DST");
this.timestamp = Long.parseLong(st.getValue("TIMESTAMP"));
String opts = st.getValue("OPTIONS");
String o[] = opts.split("\\|");
readRpcIdsFromXml(st);
}
}

/**
* Class for writing editlog ops
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public enum FSEditLogOpCodes {
OP_DISABLE_ERASURE_CODING_POLICY((byte) 51,
DisableErasureCodingPolicyOp.class),
OP_REMOVE_ERASURE_CODING_POLICY((byte) 52, RemoveErasureCodingPolicyOp.class),
OP_SWAP_BLOCK_LIST((byte) 53, SwapBlockListOp.class),

// Note that the current range of the valid OP code is 0~127
OP_INVALID ((byte) -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.server.common.ECTopologyVerifier;
import org.apache.hadoop.hdfs.server.namenode.SwapBlockListOp.SwapBlockListResult;
import org.apache.hadoop.hdfs.server.namenode.FSDirSwapBlockListOp.SwapBlockListResult;
import org.apache.hadoop.hdfs.server.namenode.metrics.ReplicatedBlocksMBean;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
Expand All @@ -135,7 +135,6 @@
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.file.Files;
import java.security.GeneralSecurityException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
Expand Down Expand Up @@ -8507,9 +8506,11 @@ public void checkErasureCodingSupported(String operationName)
* @param dst destination file.
* @throws IOException on Error.
*/
boolean swapBlockList(final String src, final String dst, long genTimestamp)
boolean swapBlockList(final String src, final String dst, long genTimestamp,
boolean logRetryCache)
throws IOException {
final String operationName = "swapBlockList";
requireEffectiveLayoutVersionForFeature(Feature.SWAP_BLOCK_LIST);
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
SwapBlockListResult res = null;
Expand All @@ -8518,7 +8519,8 @@ boolean swapBlockList(final String src, final String dst, long genTimestamp)
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot swap block list." + src + ", " + dst);
res = SwapBlockListOp.swapBlocks(dir, pc, src, dst, genTimestamp);
res = FSDirSwapBlockListOp.swapBlocks(dir, pc, src, dst, genTimestamp,
logRetryCache);
} finally {
writeUnlock(operationName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public enum Feature implements LayoutFeature {
APPEND_NEW_BLOCK(-62, -61, "Support appending to new block"),
QUOTA_BY_STORAGE_TYPE(-63, -61, "Support quota for specific storage types"),
ERASURE_CODING(-64, -61, "Support erasure coding"),
EXPANDED_STRING_TABLE(-65, -61, "Support expanded string table in fsimage");
EXPANDED_STRING_TABLE(-65, -61, "Support expanded string table in fsimage"),
SWAP_BLOCK_LIST(-66, -66, "Support swapBlockList Op");

private final FeatureInfo info;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2674,7 +2674,18 @@ public boolean swapBlockList(String src, String dst, long maxTimestamp)
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.swapBlockList: {} and {}", src, dst);
}
return namesystem.swapBlockList(src, dst, maxTimestamp);
final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return true;
}
boolean ret = false;
try {
ret =
namesystem.swapBlockList(src, dst, maxTimestamp, cacheEntry != null);
} finally {
RetryCache.setState(cacheEntry, ret);
}
return ret;
}

}
Loading