-
Notifications
You must be signed in to change notification settings - Fork 483
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HDDS-10383. Introduce a Provider for client-side thread resources passing #6222
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xichen01 , thanks for working on this! Please see the comments inlined.
* Provides resources for BlockOutputStream, including executor service, | ||
* and client metrics. | ||
*/ | ||
public final class BlockOutputStreamResourceProvider { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems adding this new BlockOutputStreamResourceProvider
is for parameter passing instead of add a new executorServiceSupplier
parameter. However, executorServiceSupplier
and clientMetrics
are not related.
Let's pass the builders; filed HDDS-10387.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class only has executorServiceSupplier
. Let's pass it directly.
public ExecutorService getWriteThreadPool() { | ||
ExecutorService localRef = writeExecutor; | ||
if (localRef == null) { | ||
synchronized (this) { | ||
localRef = writeExecutor; | ||
if (localRef == null) { | ||
localRef = createThreadPoolExecutor(WRITE_POOL_MIN_SIZE, | ||
Integer.MAX_VALUE, | ||
"client-write-TID-%d"); | ||
writeExecutor = localRef; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use MemoizedSupplier
:
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 94d6ae9769..abb981caa9 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -145,6 +145,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.util.MemoizedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -195,6 +196,9 @@ public class RpcClient implements ClientProtocol {
// for reconstruction.
private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;
+ // TODO: Adjusts to the appropriate value when the writeThreadPool is used.
+ private static final int WRITE_POOL_MIN_SIZE = 0;
+
private final ConfigurationSource conf;
private final OzoneManagerClientProtocol ozoneManagerClient;
private final XceiverClientFactory xceiverClientManager;
@@ -213,7 +217,8 @@ public class RpcClient implements ClientProtocol {
private final ByteBufferPool byteBufferPool;
private final BlockInputStreamFactory blockInputStreamFactory;
private final OzoneManagerVersion omVersion;
- private volatile ExecutorService ecReconstructExecutor;
+ private final MemoizedSupplier<ExecutorService> ecReconstructExecutor;
+ private final MemoizedSupplier<ExecutorService> writeExecutor;
private final ContainerClientMetrics clientMetrics;
private final AtomicBoolean isS3GRequest = new AtomicBoolean(false);
@@ -237,6 +242,12 @@ public RpcClient(ConfigurationSource conf, String omServiceId)
this.groupRights = aclConfig.getGroupDefaultRights();
this.clientConfig = conf.getObject(OzoneClientConfig.class);
+ this.ecReconstructExecutor = MemoizedSupplier.valueOf(() -> createThreadPoolExecutor(
+ EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
+ clientConfig.getEcReconstructStripeReadPoolLimit(),
+ "ec-reconstruct-reader-TID-%d"));
+ this.writeExecutor = MemoizedSupplier.valueOf(() -> createThreadPoolExecutor(
+ WRITE_POOL_MIN_SIZE, Integer.MAX_VALUE, "client-write-TID-%d"));
OmTransport omTransport = createOmTransport(omServiceId);
OzoneManagerProtocolClientSideTranslatorPB
@@ -311,7 +322,7 @@ public void onRemoval(
}).build();
this.byteBufferPool = new ElasticByteBufferPool();
this.blockInputStreamFactory = BlockInputStreamFactoryImpl
- .getInstance(byteBufferPool, this::getECReconstructExecutor);
+ .getInstance(byteBufferPool, ecReconstructExecutor);
this.clientMetrics = ContainerClientMetrics.acquire();
}
@@ -1752,9 +1763,11 @@ private OmKeyInfo getKeyInfo(OmKeyArgs keyArgs) throws IOException {
@Override
public void close() throws IOException {
- if (ecReconstructExecutor != null) {
- ecReconstructExecutor.shutdownNow();
- ecReconstructExecutor = null;
+ if (ecReconstructExecutor.isInitialized()) {
+ ecReconstructExecutor.get().shutdownNow();
+ }
+ if (writeExecutor.isInitialized()) {
+ writeExecutor.get().shutdownNow();
}
IOUtils.cleanupWithLogger(LOG, ozoneManagerClient, xceiverClientManager);
keyProviderCache.invalidateAll();
@@ -2496,26 +2509,11 @@ public void setTimes(OzoneObj obj, String keyName, long mtime, long atime)
ozoneManagerClient.setTimes(builder.build(), mtime, atime);
}
- public ExecutorService getECReconstructExecutor() {
- // local ref to a volatile to ensure access
- // to a completed initialized object
- ExecutorService executor = ecReconstructExecutor;
- if (executor == null) {
- synchronized (this) {
- executor = ecReconstructExecutor;
- if (executor == null) {
- ecReconstructExecutor = new ThreadPoolExecutor(
- EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
- clientConfig.getEcReconstructStripeReadPoolLimit(),
- 60, TimeUnit.SECONDS, new SynchronousQueue<>(),
- new ThreadFactoryBuilder()
- .setNameFormat("ec-reconstruct-reader-TID-%d")
- .build(),
- new ThreadPoolExecutor.CallerRunsPolicy());
- executor = ecReconstructExecutor;
- }
- }
- }
- return executor;
+ private static ExecutorService createThreadPoolExecutor(
+ int corePoolSize, int maximumPoolSize, String threadNameFormat) {
+ return new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
+ 60, TimeUnit.SECONDS, new SynchronousQueue<>(),
+ new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build(),
+ new ThreadPoolExecutor.CallerRunsPolicy());
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
# Conflicts: # hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java # hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java # hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java # hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java # hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java # hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java # hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xichen01 , thanks for the update! Please see the comment inlined and also https://issues.apache.org/jira/secure/attachment/13066945/6222_review.patch
* Provides resources for BlockOutputStream, including executor service, | ||
* and client metrics. | ||
*/ | ||
public final class BlockOutputStreamResourceProvider { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class only has executorServiceSupplier
. Let's pass it directly.
Thanks for your comment. I have applied it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 the change looks good.
…sing (apache#6222) (cherry picked from commit f0b75b7)
What changes were proposed in this pull request?
Introduce a Provider for client-side thread resources passing.
Detail: HDDS-9912
The changes here are split from HDDS-9912, which just introduces
BlockOutputStreamResourceProvider
and implements its parameter passing.The changes doesn't actually use
BlockOutputStreamResourceProvider
's thread resources. So the logic of creating and using threads inBlockOutputStream
remains unchanged.What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/HDDS-10383
How was this patch tested?
Existing Test.