Skip to content

Commit

Permalink
Merge branch 'trunk' of github.com:apache/hadoop into HADOOP-18756-trunk
Browse files Browse the repository at this point in the history
  • Loading branch information
virajjasani committed Jun 7, 2023
2 parents 0adac72 + 1dbaba8 commit 7dd24ba
Show file tree
Hide file tree
Showing 32 changed files with 671 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
Expand Down Expand Up @@ -71,6 +73,16 @@ public class SingleFilePerBlockCache implements BlockCache {

private final PrefetchingStatistics prefetchingStatistics;

/**
* Timeout to be used by close, while acquiring prefetch block write lock.
*/
private static final int PREFETCH_WRITE_LOCK_TIMEOUT = 5;

/**
* Lock timeout unit to be used by the thread while acquiring prefetch block write lock.
*/
private static final TimeUnit PREFETCH_WRITE_LOCK_TIMEOUT_UNIT = TimeUnit.SECONDS;

/**
* File attributes attached to any intermediate temporary file created during index creation.
*/
Expand All @@ -86,12 +98,18 @@ private static final class Entry {
private final Path path;
private final int size;
private final long checksum;
private final ReentrantReadWriteLock lock;
private enum LockType {
READ,
WRITE
}

Entry(int blockNumber, Path path, int size, long checksum) {
this.blockNumber = blockNumber;
this.path = path;
this.size = size;
this.checksum = checksum;
this.lock = new ReentrantReadWriteLock();
}

@Override
Expand All @@ -100,6 +118,54 @@ public String toString() {
"([%03d] %s: size = %d, checksum = %d)",
blockNumber, path, size, checksum);
}

/**
* Take the read or write lock.
*
* @param lockType type of the lock.
*/
private void takeLock(LockType lockType) {
if (LockType.READ == lockType) {
lock.readLock().lock();
} else if (LockType.WRITE == lockType) {
lock.writeLock().lock();
}
}

/**
* Release the read or write lock.
*
* @param lockType type of the lock.
*/
private void releaseLock(LockType lockType) {
if (LockType.READ == lockType) {
lock.readLock().unlock();
} else if (LockType.WRITE == lockType) {
lock.writeLock().unlock();
}
}

/**
* Try to take the read or write lock within the given timeout.
*
* @param lockType type of the lock.
* @param timeout the time to wait for the given lock.
* @param unit the time unit of the timeout argument.
* @return true if the lock of the given lock type was acquired.
*/
private boolean takeLock(LockType lockType, long timeout, TimeUnit unit) {
try {
if (LockType.READ == lockType) {
return lock.readLock().tryLock(timeout, unit);
} else if (LockType.WRITE == lockType) {
return lock.writeLock().tryLock(timeout, unit);
}
} catch (InterruptedException e) {
LOG.warn("Thread interrupted while trying to acquire {} lock", lockType, e);
Thread.currentThread().interrupt();
}
return false;
}
}

/**
Expand Down Expand Up @@ -150,11 +216,15 @@ public void get(int blockNumber, ByteBuffer buffer) throws IOException {
checkNotNull(buffer, "buffer");

Entry entry = getEntry(blockNumber);
buffer.clear();
readFile(entry.path, buffer);
buffer.rewind();

validateEntry(entry, buffer);
entry.takeLock(Entry.LockType.READ);
try {
buffer.clear();
readFile(entry.path, buffer);
buffer.rewind();
validateEntry(entry, buffer);
} finally {
entry.releaseLock(Entry.LockType.READ);
}
}

protected int readFile(Path path, ByteBuffer buffer) throws IOException {
Expand Down Expand Up @@ -202,7 +272,12 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf,

if (blocks.containsKey(blockNumber)) {
Entry entry = blocks.get(blockNumber);
validateEntry(entry, buffer);
entry.takeLock(Entry.LockType.READ);
try {
validateEntry(entry, buffer);
} finally {
entry.releaseLock(Entry.LockType.READ);
}
return;
}

Expand Down Expand Up @@ -265,12 +340,22 @@ public void close() throws IOException {
int numFilesDeleted = 0;

for (Entry entry : blocks.values()) {
boolean lockAcquired = entry.takeLock(Entry.LockType.WRITE, PREFETCH_WRITE_LOCK_TIMEOUT,
PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
if (!lockAcquired) {
LOG.error("Cache file {} deletion would not be attempted as write lock could not"
+ " be acquired within {} {}", entry.path, PREFETCH_WRITE_LOCK_TIMEOUT,
PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
continue;
}
try {
Files.deleteIfExists(entry.path);
prefetchingStatistics.blockRemovedFromFileCache();
numFilesDeleted++;
} catch (IOException e) {
LOG.debug("Failed to delete cache file {}", entry.path, e);
LOG.error("Failed to delete cache file {}", entry.path, e);
} finally {
entry.releaseLock(Entry.LockType.WRITE);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyState;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing.DiffReportListingEntry;
Expand Down Expand Up @@ -835,6 +837,36 @@ public static FsStatus toFsStatus(Map<?, ?> json) {
return new FsStatus(capacity, used, remaining);
}

public static Collection<ErasureCodingPolicyInfo> getAllErasureCodingPolicies(
Map<?, ?> json) {
Map<?, ?> erasureCodingPoliciesJson = (Map<?, ?>) json.get("ErasureCodingPolicies");
if (erasureCodingPoliciesJson != null) {
List<?> objs = (List<?>) erasureCodingPoliciesJson.get(ErasureCodingPolicyInfo.class
.getSimpleName());
if (objs != null) {
ErasureCodingPolicyInfo[] erasureCodingPolicies = new ErasureCodingPolicyInfo[objs
.size()];
for (int i = 0; i < objs.size(); i++) {
final Map<?, ?> m = (Map<?, ?>) objs.get(i);
ErasureCodingPolicyInfo erasureCodingPolicyInfo = toECPolicyInfo(m);
erasureCodingPolicies[i] = erasureCodingPolicyInfo;
}
return Arrays.asList(erasureCodingPolicies);
}
}
return new ArrayList<ErasureCodingPolicyInfo>(0);
}

public static ErasureCodingPolicyInfo toECPolicyInfo(Map<?, ?> m) {
if (m == null) {
return null;
}
ErasureCodingPolicy ecPolicy = toECPolicy((Map<?, ?>) m.get("policy"));
String state = getString(m, "state", "DISABLE");
final ErasureCodingPolicyState ecPolicyState = ErasureCodingPolicyState.valueOf(state);
return new ErasureCodingPolicyInfo(ecPolicy, ecPolicyState);
}

private static List<SnapshotDiffReport.DiffReportEntry> toDiffList(
List<?> objs) {
if (objs == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
Expand Down Expand Up @@ -2192,6 +2193,19 @@ FsStatus decodeResponse(Map<?, ?> json) {
}.run();
}

public Collection<ErasureCodingPolicyInfo> getAllErasureCodingPolicies()
throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_EC_POLICIES);
final GetOpParam.Op op = GetOpParam.Op.GETECPOLICIES;
return new FsPathResponseRunner<Collection<ErasureCodingPolicyInfo>>(op, null) {
@Override
Collection<ErasureCodingPolicyInfo> decodeResponse(Map<?, ?> json) {
return JsonUtilClient.getAllErasureCodingPolicies(json);
}
}.run();
}

@VisibleForTesting
InetSocketAddress[] getResolvedNNAddr() {
return nnAddrs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public enum Op implements HttpOpParam.Op {
GETLINKTARGET(false, HttpURLConnection.HTTP_OK),
GETFILELINKSTATUS(false, HttpURLConnection.HTTP_OK),
GETSTATUS(false, HttpURLConnection.HTTP_OK),
GETECPOLICIES(false, HttpURLConnection.HTTP_OK),
GETSNAPSHOTLIST(false, HttpURLConnection.HTTP_OK);

final boolean redirect;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ public synchronized void close(boolean force) {
// this is an erroneous case, but we have to close the connection
// anyway since there will be connection leak if we don't do so
// the connection has been moved out of the pool
LOG.error("Active connection with {} handlers will be closed",
this.numThreads);
LOG.error("Active connection with {} handlers will be closed, ConnectionContext is {}",
this.numThreads, this);
}
this.closed = true;
Object proxy = this.client.getProxy();
Expand All @@ -170,7 +170,10 @@ public String toString() {
Class<?> clazz = proxy.getClass();

StringBuilder sb = new StringBuilder();
sb.append(clazz.getSimpleName())
sb.append("hashcode:")
.append(hashCode())
.append(" ")
.append(clazz.getSimpleName())
.append("@")
.append(addr)
.append("x")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ public void run() {
pool.getMaxSize(), pool);
}
} catch (IOException e) {
LOG.error("Cannot create a new connection", e);
LOG.error("Cannot create a new connection for {} {}", pool, e);
}
} catch (InterruptedException e) {
LOG.error("The connection creator was interrupted");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ public synchronized List<ConnectionContext> removeConnections(int num) {
}
this.connections = tmpConnections;
}
LOG.debug("Expected to remove {} connection and actually removed {} connections",
num, removed.size());
LOG.debug("Expected to remove {} connection and actually removed {} connections "
+ "for connectionPool: {}", num, removed.size(), connectionPoolId);
return removed;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private static INodeFile[] verifySrcFiles(FSDirectory fsd, String[] srcs,
for(String src : srcs) {
final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.WRITE);
// permission check for srcs
if (pc != null) {
if (pc != null && fsd.isPermissionEnabled()) {
fsd.checkPathAccess(pc, iip, FsAction.READ); // read the file
fsd.checkParentAccess(pc, iip, FsAction.WRITE); // for delete
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
Expand Down Expand Up @@ -1407,6 +1408,11 @@ protected Response get(
final String js = JsonUtil.toJsonString(status);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case GETECPOLICIES: {
ErasureCodingPolicyInfo[] ecPolicyInfos = cp.getErasureCodingPolicies();
final String js = JsonUtil.toJsonString(ecPolicyInfos);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
default:
throw new UnsupportedOperationException(op + " is not supported");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,4 +741,27 @@ public static Map<String, Object> toJsonMap(FsStatus status) {
m.put("remaining", status.getRemaining());
return m;
}

public static Map<String, Object> toJsonMap(ErasureCodingPolicyInfo ecPolicyInfo) {
if (ecPolicyInfo == null) {
return null;
}
Map<String, Object> m = new HashMap<>();
m.put("policy", ecPolicyInfo.getPolicy());
m.put("state", ecPolicyInfo.getState());
return m;
}

public static String toJsonString(ErasureCodingPolicyInfo[] ecPolicyInfos) {
final Map<String, Object> erasureCodingPolicies = new HashMap<>();
Object[] erasureCodingPolicyInfos = null;
if (ecPolicyInfos != null && ecPolicyInfos.length > 0) {
erasureCodingPolicyInfos = new Object[ecPolicyInfos.length];
for (int i = 0; i < ecPolicyInfos.length; i++) {
erasureCodingPolicyInfos[i] = toJsonMap(ecPolicyInfos[i]);
}
}
erasureCodingPolicies.put("ErasureCodingPolicyInfo", erasureCodingPolicyInfos);
return toJsonString("ErasureCodingPolicies", erasureCodingPolicies);
}
}
Loading

0 comments on commit 7dd24ba

Please sign in to comment.