Skip to content

Commit

Permalink
HDFS-13603: do not propagate ExecutionException and add maxRetries li…
Browse files Browse the repository at this point in the history
…mit to NameNode edek cache warmup
  • Loading branch information
yzhang559 committed May 7, 2024
1 parent edf985e commit 10d763a
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -947,11 +947,7 @@ public void flush() throws IOException {
@Override
public void warmUpEncryptedKeys(String... keyNames)
throws IOException {
try {
encKeyVersionQueue.initializeQueuesForKeys(keyNames);
} catch (ExecutionException e) {
throw new IOException(e);
}
encKeyVersionQueue.initializeQueuesForKeys(keyNames);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,23 @@ public ValueQueue(final int numValues, final float lowWaterMark, long expiry,
* Initializes the Value Queues for the provided keys by calling the
* fill Method with "numInitValues" values
* @param keyNames Array of key Names
* @throws ExecutionException executionException.
* @throws IOException if no successful initialization for any key
*/
public void initializeQueuesForKeys(String... keyNames)
throws ExecutionException {
public void initializeQueuesForKeys(String... keyNames) throws IOException {
int successfulInitializations = 0;
ExecutionException lastException = null;

for (String keyName : keyNames) {
keyQueues.get(keyName);
try {
keyQueues.get(keyName);
successfulInitializations++;
} catch (ExecutionException e) {
lastException = e;
}
}

if (keyNames.length > 0 && successfulInitializations == 0) {
throw new IOException("Failed to initialize any queue for the provided keys.", lastException);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,8 @@ SyncGenerationPolicy.LOW_WATERMARK, new EncryptedQueueRefiller()
}

@Override
public void warmUpEncryptedKeys(String... keyNames) throws
IOException {
try {
encKeyVersionQueue.initializeQueuesForKeys(keyNames);
} catch (ExecutionException e) {
throw new IOException(e);
}
public void warmUpEncryptedKeys(String... keyNames) throws IOException {
encKeyVersionQueue.initializeQueuesForKeys(keyNames);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_DEFAULT = 1000;
public static final String DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY = "dfs.namenode.edekcacheloader.initial.delay.ms";
public static final int DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_DEFAULT = 3000;
public static final String DFS_NAMENODE_EDEKCACHELOADER_MAX_RETRIES_KEY =
"dfs.namenode.edekcacheloader.max-retries";
public static final int DFS_NAMENODE_EDEKCACHELOADER_MAX_RETRIES_DEFAULT = 10;
public static final String DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY = "dfs.namenode.reencrypt.sleep.interval";
public static final String DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_DEFAULT = "1m";
public static final String DFS_NAMENODE_REENCRYPT_BATCH_SIZE_KEY = "dfs.namenode.reencrypt.batch.size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,12 +537,12 @@ static boolean isInAnEZ(final FSDirectory fsd, final INodesInPath iip)
* then launch up a separate thread to warm them up.
*/
static void warmUpEdekCache(final ExecutorService executor,
final FSDirectory fsd, final int delay, final int interval) {
final FSDirectory fsd, final int delay, final int interval, final int maxRetries) {
fsd.readLock();
try {
String[] edeks = fsd.ezManager.getKeyNames();
executor.execute(
new EDEKCacheLoader(edeks, fsd.getProvider(), delay, interval));
new EDEKCacheLoader(edeks, fsd.getProvider(), delay, interval, maxRetries));
} finally {
fsd.readUnlock();
}
Expand All @@ -557,19 +557,22 @@ static class EDEKCacheLoader implements Runnable {
private final KeyProviderCryptoExtension kp;
private int initialDelay;
private int retryInterval;
private int maxRetries;

EDEKCacheLoader(final String[] names, final KeyProviderCryptoExtension kp,
final int delay, final int interval) {
final int delay, final int interval, final int maxRetries) {
this.keyNames = names;
this.kp = kp;
this.initialDelay = delay;
this.retryInterval = interval;
this.maxRetries = maxRetries;
}

@Override
public void run() {
NameNode.LOG.info("Warming up {} EDEKs... (initialDelay={}, "
+ "retryInterval={})", keyNames.length, initialDelay, retryInterval);
+ "retryInterval={}, maxRetries={})", keyNames.length, initialDelay, retryInterval,
maxRetries);
try {
Thread.sleep(initialDelay);
} catch (InterruptedException ie) {
Expand All @@ -580,15 +583,15 @@ public void run() {
final int logCoolDown = 10000; // periodically print error log (if any)
int sinceLastLog = logCoolDown; // always print the first failure
boolean success = false;
int retryCount = 0;
IOException lastSeenIOE = null;
long warmUpEDEKStartTime = monotonicNow();
while (true) {

while (!success && retryCount < maxRetries) {
try {
kp.warmUpEncryptedKeys(keyNames);
NameNode.LOG
.info("Successfully warmed up {} EDEKs.", keyNames.length);
NameNode.LOG.info("Successfully warmed up {} EDEKs.", keyNames.length);
success = true;
break;
} catch (IOException ioe) {
lastSeenIOE = ioe;
if (sinceLastLog >= logCoolDown) {
Expand All @@ -601,18 +604,22 @@ public void run() {
NameNode.LOG.error("Cannot warm up EDEKs.", e);
throw e;
}
try {
Thread.sleep(retryInterval);
} catch (InterruptedException ie) {
NameNode.LOG.info("EDEKCacheLoader interrupted during retry.");
break;

if (!success) {
try {
Thread.sleep(retryInterval);
} catch (InterruptedException ie) {
NameNode.LOG.info("EDEKCacheLoader interrupted during retry.");
break;
}
retryCount++;
}
sinceLastLog += retryInterval;
}

long warmUpEDEKTime = monotonicNow() - warmUpEDEKStartTime;
NameNode.getNameNodeMetrics().addWarmUpEDEKTime(warmUpEDEKTime);
if (!success) {
NameNode.LOG.warn("Unable to warm up EDEKs.");
NameNode.LOG.warn("Max retry {} reached, unable to warm up EDEKs.", maxRetries);
if (lastSeenIOE != null) {
NameNode.LOG.warn("Last seen exception:", lastSeenIOE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ private boolean isFromProxyUser(CallerContext ctx) {
private ExecutorService edekCacheLoader = null;
private final int edekCacheLoaderDelay;
private final int edekCacheLoaderInterval;
private final int edekCacheLoaderMaxRetries;

/**
* When an active namenode will roll its own edit log, in # edits
Expand Down Expand Up @@ -1012,6 +1013,9 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
this.edekCacheLoaderInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_KEY,
DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_DEFAULT);
this.edekCacheLoaderMaxRetries = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_MAX_RETRIES_KEY,
DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_MAX_RETRIES_DEFAULT);

this.leaseRecheckIntervalMs = conf.getLong(
DFS_NAMENODE_LEASE_RECHECK_INTERVAL_MS_KEY,
Expand Down Expand Up @@ -1470,8 +1474,9 @@ void startActiveServices() throws IOException {
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Warm Up EDEK Cache Thread #%d")
.build());
FSDirEncryptionZoneOp.warmUpEdekCache(edekCacheLoader, dir,
edekCacheLoaderDelay, edekCacheLoaderInterval);
FSDirEncryptionZoneOp
.warmUpEdekCache(edekCacheLoader, dir, edekCacheLoaderDelay, edekCacheLoaderInterval,
edekCacheLoaderMaxRetries);
}
if (blockManager.getSPSManager() != null) {
blockManager.getSPSManager().start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3607,6 +3607,14 @@
</description>
</property>

<property>
<name>dfs.namenode.edekcacheloader.max-retries</name>
<value>10</value>
<description>When KeyProvider is configured, the max retries allowed to attempt
warm up edek cache if none of key successful on NN start up / become active.
</description>
</property>

<property>
<name>dfs.namenode.reencrypt.sleep.interval</name>
<value>1m</value>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;

import org.junit.Test;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

public class TestFSDirEncryptionZoneOp {

@Test
public void testWarmUpEdekCacheRetries() throws IOException {
NameNode.initMetrics(new Configuration(), NamenodeRole.NAMENODE);

final int initialDelay = 100;
final int retryInterval = 100;
final int maxRetries = 2;

KeyProviderCryptoExtension kpMock = mock(KeyProviderCryptoExtension.class);

doThrow(new IOException())
.doThrow(new IOException())
.doAnswer(invocation -> null)
.when(kpMock).warmUpEncryptedKeys(any());

FSDirEncryptionZoneOp.EDEKCacheLoader loader =
new FSDirEncryptionZoneOp.EDEKCacheLoader(new String[] {"edek1", "edek2"}, kpMock,
initialDelay, retryInterval, maxRetries);

loader.run();

verify(kpMock, times(maxRetries)).warmUpEncryptedKeys(any());
}
}

0 comments on commit 10d763a

Please sign in to comment.