Skip to content

Commit

Permalink
[MRESOLVER-554] Split thread counts for up and downstream ops (#489)
Browse files Browse the repository at this point in the history
Basic connector new config to be able to differentiate thread counts used for upload and download.

---

https://issues.apache.org/jira/browse/MRESOLVER-554
  • Loading branch information
cstamas committed May 2, 2024
1 parent e372904 commit b5f8388
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 108 deletions.
Expand Up @@ -27,6 +27,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -68,10 +69,12 @@
import org.slf4j.LoggerFactory;

import static java.util.Objects.requireNonNull;
import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_DOWNSTREAM_THREADS;
import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_PARALLEL_PUT;
import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_PERSISTED_CHECKSUMS;
import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_SMART_CHECKSUMS;
import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_THREADS;
import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_UPSTREAM_THREADS;
import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_PARALLEL_PUT;
import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_PERSISTED_CHECKSUMS;
import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_SMART_CHECKSUMS;
Expand All @@ -97,15 +100,17 @@ final class BasicRepositoryConnector implements RepositoryConnector {

private final ChecksumPolicyProvider checksumPolicyProvider;

private final int maxThreads;
private final int maxDownstreamThreads;

private final int maxUpstreamThreads;

private final boolean smartChecksums;

private final boolean parallelPut;

private final boolean persistedChecksums;

private Executor executor;
private final ConcurrentHashMap<Boolean, Executor> executors;

private final AtomicBoolean closed;

Expand Down Expand Up @@ -134,9 +139,21 @@ final class BasicRepositoryConnector implements RepositoryConnector {
this.repository = repository;
this.checksumProcessor = checksumProcessor;
this.providedChecksumsSources = providedChecksumsSources;
this.executors = new ConcurrentHashMap<>();
this.closed = new AtomicBoolean(false);

maxThreads = ExecutorUtils.threadCount(session, DEFAULT_THREADS, CONFIG_PROP_THREADS);
maxUpstreamThreads = ExecutorUtils.threadCount(
session,
DEFAULT_THREADS,
CONFIG_PROP_UPSTREAM_THREADS + "." + repository.getId(),
CONFIG_PROP_UPSTREAM_THREADS,
CONFIG_PROP_THREADS);
maxDownstreamThreads = ExecutorUtils.threadCount(
session,
DEFAULT_THREADS,
CONFIG_PROP_DOWNSTREAM_THREADS + "." + repository.getId(),
CONFIG_PROP_DOWNSTREAM_THREADS,
CONFIG_PROP_THREADS);
smartChecksums = ConfigUtils.getBoolean(session, DEFAULT_SMART_CHECKSUMS, CONFIG_PROP_SMART_CHECKSUMS);
parallelPut = ConfigUtils.getBoolean(
session,
Expand All @@ -147,24 +164,26 @@ final class BasicRepositoryConnector implements RepositoryConnector {
ConfigUtils.getBoolean(session, DEFAULT_PERSISTED_CHECKSUMS, CONFIG_PROP_PERSISTED_CHECKSUMS);
}

private Executor getExecutor(int tasks) {
private Executor getExecutor(boolean downstream, int tasks) {
int maxThreads = downstream ? maxDownstreamThreads : maxUpstreamThreads;
if (maxThreads <= 1) {
return ExecutorUtils.DIRECT_EXECUTOR;
}
if (tasks <= 1) {
return ExecutorUtils.DIRECT_EXECUTOR;
}
if (executor == null) {
executor =
ExecutorUtils.threadPool(maxThreads, getClass().getSimpleName() + '-' + repository.getHost() + '-');
}
return executor;
return executors.computeIfAbsent(
downstream,
k -> ExecutorUtils.threadPool(
maxThreads, getClass().getSimpleName() + '-' + repository.getHost() + '-'));
}

@Override
public void close() {
if (closed.compareAndSet(false, true)) {
ExecutorUtils.shutdown(executor);
for (Executor executor : executors.values()) {
ExecutorUtils.shutdown(executor);
}
transporter.close();
}
}
Expand All @@ -184,7 +203,7 @@ public void get(
Collection<? extends ArtifactDownload> safeArtifactDownloads = safe(artifactDownloads);
Collection<? extends MetadataDownload> safeMetadataDownloads = safe(metadataDownloads);

Executor executor = getExecutor(safeArtifactDownloads.size() + safeMetadataDownloads.size());
Executor executor = getExecutor(true, safeArtifactDownloads.size() + safeMetadataDownloads.size());
RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
List<ChecksumAlgorithmFactory> checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories();

Expand Down Expand Up @@ -276,7 +295,8 @@ public void put(
Collection<? extends ArtifactUpload> safeArtifactUploads = safe(artifactUploads);
Collection<? extends MetadataUpload> safeMetadataUploads = safe(metadataUploads);

Executor executor = getExecutor(parallelPut ? safeArtifactUploads.size() + safeMetadataUploads.size() : 1);
Executor executor =
getExecutor(false, parallelPut ? safeArtifactUploads.size() + safeMetadataUploads.size() : 1);
RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();

boolean first = true;
Expand Down
Expand Up @@ -50,16 +50,41 @@ private BasicRepositoryConnectorConfigurationKeys() {}
public static final boolean DEFAULT_PERSISTED_CHECKSUMS = true;

/**
* Number of threads in basic connector for uploading/downloading.
* Number of threads in basic connector for uploading/downloading. Observed only if some of the
* upstream or downstream threads are not set. (Deprecated)
*
* @since 0.9.0.M4
* @configurationSource {@link RepositorySystemSession#getConfigProperties()}
* @configurationType {@link java.lang.Integer}
* @configurationDefaultValue {@link #DEFAULT_THREADS}
* @configurationRepoIdSuffix No
* @deprecated Use {@link #CONFIG_PROP_UPSTREAM_THREADS} and {@link #CONFIG_PROP_DOWNSTREAM_THREADS} instead.
*/
@Deprecated
public static final String CONFIG_PROP_THREADS = CONFIG_PROPS_PREFIX + "threads";

/**
* Number of threads in basic connector for uploading.
*
* @since 2.0.0
* @configurationSource {@link RepositorySystemSession#getConfigProperties()}
* @configurationType {@link java.lang.Integer}
* @configurationDefaultValue {@link #DEFAULT_THREADS}
* @configurationRepoIdSuffix Yes
*/
public static final String CONFIG_PROP_UPSTREAM_THREADS = CONFIG_PROPS_PREFIX + "upstreamThreads";

/**
* Number of threads in basic connector for downloading.
*
* @since 2.0.0
* @configurationSource {@link RepositorySystemSession#getConfigProperties()}
* @configurationType {@link java.lang.Integer}
* @configurationDefaultValue {@link #DEFAULT_THREADS}
* @configurationRepoIdSuffix Yes
*/
public static final String CONFIG_PROP_DOWNSTREAM_THREADS = CONFIG_PROPS_PREFIX + "downstreamThreads";

public static final int DEFAULT_THREADS = 5;

/**
Expand Down

0 comments on commit b5f8388

Please sign in to comment.