From 5ce731254e0b01f340d05ef1caddfff612bcbadf Mon Sep 17 00:00:00 2001 From: Eric Cai Date: Wed, 26 Oct 2022 14:35:13 +0800 Subject: [PATCH] [MRESOLVER-7] Download POMs in parallel in DF collector (#178) Parallel POM downloading in DF collector. - only parallelize the VersionRange and ArtifactDesciptor resolution part (poms.xml & maven-metadata.xml downloading in parallel) as the poms downloading or maven-metadata.xml is the most slow part. - Pure dependency resolution logic and the [skip logic](https://github.com/apache/maven-resolver/pull/158) are still run in sequence. --- https://issues.apache.org/jira/browse/MRESOLVER-7 --- .../collect/bf/BfDependencyCollector.java | 284 +++++++++++++++--- .../bf/DependencyProcessingContext.java | 17 +- 2 files changed, 252 insertions(+), 49 deletions(-) diff --git a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java index c82eed941..6145c595a 100644 --- a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java +++ b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java @@ -26,12 +26,28 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Queue; - +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.commons.lang3.concurrent.ConcurrentUtils; import org.eclipse.aether.RepositorySystemSession; import org.eclipse.aether.RequestTrace; import org.eclipse.aether.artifact.Artifact; +import org.eclipse.aether.artifact.ArtifactType; +import org.eclipse.aether.artifact.DefaultArtifact; import org.eclipse.aether.collection.CollectRequest; import org.eclipse.aether.collection.DependencyManager; import org.eclipse.aether.collection.DependencySelector; @@ -53,12 +69,15 @@ import org.eclipse.aether.resolution.ArtifactDescriptorRequest; import org.eclipse.aether.resolution.ArtifactDescriptorResult; import org.eclipse.aether.resolution.VersionRangeRequest; -import org.eclipse.aether.resolution.VersionRangeResolutionException; import org.eclipse.aether.resolution.VersionRangeResult; import org.eclipse.aether.spi.locator.Service; import org.eclipse.aether.util.ConfigUtils; +import org.eclipse.aether.util.artifact.ArtifactIdUtils; +import org.eclipse.aether.util.concurrency.WorkerThreadFactory; import org.eclipse.aether.util.graph.manager.DependencyManagerUtils; import org.eclipse.aether.version.Version; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.eclipse.aether.internal.impl.collect.DefaultDependencyCycle.find; @@ -128,7 +147,8 @@ protected void doCollectDependencies( RepositorySystemSession session, RequestTr Args args = new Args( session, pool, context, versionContext, request, useSkip ? DependencyResolutionSkipper.defaultSkipper() - : DependencyResolutionSkipper.neverSkipper() ); + : DependencyResolutionSkipper.neverSkipper(), + new ParallelDescriptorResolver( session ) ); DependencySelector rootDepSelector = session.getDependencySelector() != null ? session.getDependencySelector().deriveChildSelector( context ) : null; @@ -142,77 +162,65 @@ protected void doCollectDependencies( RepositorySystemSession session, RequestTr List parents = Collections.singletonList( node ); for ( Dependency dependency : dependencies ) { - args.dependencyProcessingQueue.add( + RequestTrace childTrace = collectStepTrace( trace, args.request.getRequestContext(), parents, + dependency ); + DependencyProcessingContext processingContext = new DependencyProcessingContext( rootDepSelector, rootDepManager, rootDepTraverser, - rootVerFilter, repositories, managedDependencies, parents, - dependency ) ); + rootVerFilter, childTrace, repositories, managedDependencies, parents, dependency, + PremanagedDependency.create( rootDepManager, dependency, + false, args.premanagedState ) ); + if ( !filter( processingContext ) ) + { + processingContext.withDependency( processingContext.premanagedDependency.getManagedDependency() ); + resolveArtifactDescriptorAsync( args, processingContext, results ); + args.dependencyProcessingQueue.add( processingContext ); + } } while ( !args.dependencyProcessingQueue.isEmpty() ) { - processDependency( args, trace, results, args.dependencyProcessingQueue.remove(), Collections.emptyList(), + processDependency( args, results, args.dependencyProcessingQueue.remove(), Collections.emptyList(), false ); } + args.resolver.shutdown(); args.skipper.report(); } @SuppressWarnings( "checkstyle:parameternumber" ) - private void processDependency( Args args, RequestTrace parent, Results results, + private void processDependency( Args args, Results results, DependencyProcessingContext context, List relocations, boolean disableVersionManagement ) { - if ( context.depSelector != null && !context.depSelector.selectDependency( context.dependency ) ) - { - return; - } - - RequestTrace trace = collectStepTrace( parent, args.request.getRequestContext(), context.parents, - context.dependency ); - PremanagedDependency preManaged = - PremanagedDependency.create( context.depManager, context.dependency, disableVersionManagement, - args.premanagedState ); - Dependency dependency = preManaged.getManagedDependency(); + Dependency dependency = context.dependency; + PremanagedDependency preManaged = context.premanagedDependency; boolean noDescriptor = isLackingDescriptor( dependency.getArtifact() ); - boolean traverse = !noDescriptor && ( context.depTraverser == null || context.depTraverser.traverseDependency( dependency ) ); - List versions; + Future resolutionResultFuture = args.resolver.find( dependency.getArtifact() ); + DescriptorResolutionResult resolutionResult; VersionRangeResult rangeResult; try { - VersionRangeRequest rangeRequest = createVersionRangeRequest( args.request.getRequestContext(), trace, - context.repositories, dependency ); - - rangeResult = cachedResolveRangeResult( rangeRequest, args.pool, args.session ); - - versions = filterVersions( dependency, rangeResult, context.verFilter, args.versionContext ); + resolutionResult = resolutionResultFuture.get(); + rangeResult = resolutionResult.rangeResult; } - catch ( VersionRangeResolutionException e ) + catch ( Exception e ) { results.addException( dependency, e, context.parents ); return; } - //Resolve newer version first to maximize benefits of skipper - Collections.reverse( versions ); + Set versions = resolutionResult.descriptors.keySet(); for ( Version version : versions ) { Artifact originalArtifact = dependency.getArtifact().setVersion( version.toString() ); Dependency d = dependency.setArtifact( originalArtifact ); - ArtifactDescriptorRequest descriptorRequest = createArtifactDescriptorRequest( - args.request.getRequestContext(), trace, context.repositories, d ); - - final ArtifactDescriptorResult descriptorResult = - noDescriptor - ? new ArtifactDescriptorResult( descriptorRequest ) - : resolveCachedArtifactDescriptor( args.pool, descriptorRequest, args.session, - context.withDependency( d ), results ); - + final ArtifactDescriptorResult descriptorResult = resolutionResult.descriptors.get( version ); if ( descriptorResult != null ) { d = d.setArtifact( descriptorResult.getArtifact() ); @@ -238,8 +246,24 @@ private void processDependency( Args args, RequestTrace parent, Results results, originalArtifact.getGroupId().equals( d.getArtifact().getGroupId() ) && originalArtifact.getArtifactId().equals( d.getArtifact().getArtifactId() ); - processDependency( args, parent, results, context.withDependency( d ), - descriptorResult.getRelocations(), disableVersionManagementSubsequently ); + PremanagedDependency premanagedDependency = + PremanagedDependency.create( context.depManager, d, disableVersionManagementSubsequently, + args.premanagedState ); + DependencyProcessingContext relocatedContext = + new DependencyProcessingContext( context.depSelector, context.depManager, + context.depTraverser, context.verFilter, + context.trace, context.repositories, descriptorResult.getManagedDependencies(), + context.parents, + d, premanagedDependency ); + + if ( !filter( relocatedContext ) ) + { + relocatedContext.withDependency( premanagedDependency.getManagedDependency() ); + resolveArtifactDescriptorAsync( args, relocatedContext, results ); + processDependency( args, results, relocatedContext, descriptorResult.getRelocations(), + disableVersionManagementSubsequently ); + } + return; } else @@ -259,7 +283,8 @@ private void processDependency( Args args, RequestTrace parent, Results results, DependencyProcessingContext parentContext = context.withDependency( d ); if ( recurse ) { - doRecurse( args, parentContext, descriptorResult, child ); + doRecurse( args, parentContext, descriptorResult, child, results, + disableVersionManagement ); } else if ( !args.skipper.skipResolution( child, parentContext.parents ) ) { @@ -284,7 +309,8 @@ else if ( !args.skipper.skipResolution( child, parentContext.parents ) ) @SuppressWarnings( "checkstyle:parameternumber" ) private void doRecurse( Args args, DependencyProcessingContext parentContext, - ArtifactDescriptorResult descriptorResult, DefaultDependencyNode child ) + ArtifactDescriptorResult descriptorResult, DefaultDependencyNode child, Results results, + boolean disableVersionManagement ) { DefaultDependencyCollectionContext context = args.collectionContext; context.set( parentContext.dependency, descriptorResult.getManagedDependencies() ); @@ -319,10 +345,24 @@ private void doRecurse( Args args, DependencyProcessingContext parentContext, parents.add( child ); for ( Dependency dependency : descriptorResult.getDependencies() ) { - args.dependencyProcessingQueue.add( + RequestTrace childTrace = + collectStepTrace( parentContext.trace, args.request.getRequestContext(), parents, + dependency ); + PremanagedDependency premanagedDependency = + PremanagedDependency.create( childManager, dependency, disableVersionManagement, + args.premanagedState ); + DependencyProcessingContext processingContext = new DependencyProcessingContext( childSelector, childManager, childTraverser, childFilter, - childRepos, descriptorResult.getManagedDependencies(), parents, dependency ) ); - + childTrace, childRepos, descriptorResult.getManagedDependencies(), parents, + dependency, premanagedDependency ); + if ( !filter( processingContext ) ) + { + //resolve descriptors ahead for managed dependency + processingContext.withDependency( + processingContext.premanagedDependency.getManagedDependency() ); + resolveArtifactDescriptorAsync( args, processingContext, results ); + args.dependencyProcessingQueue.add( processingContext ); + } } args.pool.putChildren( key, child.getChildren() ); args.skipper.cache( child, parents ); @@ -334,6 +374,67 @@ private void doRecurse( Args args, DependencyProcessingContext parentContext, } } + private boolean filter( DependencyProcessingContext context ) + { + return context.depSelector != null && !context.depSelector.selectDependency( context.dependency ); + } + + + private void resolveArtifactDescriptorAsync( Args args, DependencyProcessingContext context, + Results results ) + { + Dependency dependency = context.dependency; + args.resolver.resolveDescriptors( dependency.getArtifact(), () -> + { + VersionRangeRequest rangeRequest = + createVersionRangeRequest( args.request.getRequestContext(), context.trace, context.repositories, + dependency ); + VersionRangeResult rangeResult = cachedResolveRangeResult( rangeRequest, args.pool, args.session ); + List versions = filterVersions( dependency, rangeResult, context.verFilter, + args.versionContext ); + + //resolve newer version first to maximize benefits of skipper + Collections.reverse( versions ); + + Map descriptors = new ConcurrentHashMap<>( versions.size() ); + Stream stream = versions.size() > 1 ? versions.parallelStream() : versions.stream(); + stream.forEach( version -> + Optional.ofNullable( resolveDescriptorForVersion( args, context, results, dependency, version ) ) + .ifPresent( r -> descriptors.put( version, r ) ) + ); + + DescriptorResolutionResult resolutionResult = + new DescriptorResolutionResult( dependency.getArtifact(), rangeResult ); + //keep original sequence + versions.forEach( version -> resolutionResult.descriptors.put( version, descriptors.get( version ) ) ); + //populate for versions in version range + resolutionResult.flatten().forEach( dr -> args.resolver.cacheVersionRangeDescriptor( dr.artifact, dr ) ); + + return resolutionResult; + } ); + } + + private ArtifactDescriptorResult resolveDescriptorForVersion( Args args, DependencyProcessingContext context, + Results results, Dependency dependency, + Version version ) + { + Artifact original = dependency.getArtifact(); + Artifact newArtifact = new DefaultArtifact( original.getGroupId(), + original.getArtifactId(), original.getClassifier(), original.getExtension(), + version.toString(), original.getProperties(), (ArtifactType) null ); + Dependency newDependency = new Dependency( newArtifact, dependency.getScope(), dependency.isOptional(), + dependency.getExclusions() ); + DependencyProcessingContext newContext = context.copy(); + + ArtifactDescriptorRequest descriptorRequest = + createArtifactDescriptorRequest( args.request.getRequestContext(), context.trace, + newContext.repositories, newDependency ); + return isLackingDescriptor( newArtifact ) + ? new ArtifactDescriptorResult( descriptorRequest ) + : resolveCachedArtifactDescriptor( args.pool, descriptorRequest, args.session, + newContext.withDependency( newDependency ), results ); + } + private ArtifactDescriptorResult resolveCachedArtifactDescriptor( DataPool pool, ArtifactDescriptorRequest descriptorRequest, RepositorySystemSession session, @@ -365,6 +466,89 @@ else if ( descriptorResult == DataPool.NO_DESCRIPTOR ) return descriptorResult; } + static class ParallelDescriptorResolver + { + final ExecutorService executorService; + + /** + * Artifact ID -> Future of DescriptorResolutionResult + */ + final Map> results = new ConcurrentHashMap<>( 256 ); + final Logger logger = LoggerFactory.getLogger( getClass() ); + + ParallelDescriptorResolver( RepositorySystemSession session ) + { + this.executorService = getExecutorService( session ); + } + + void resolveDescriptors( Artifact artifact, Callable callable ) + { + results.computeIfAbsent( ArtifactIdUtils.toId( artifact ), + key -> this.executorService.submit( callable ) ); + } + + void cacheVersionRangeDescriptor( Artifact artifact, DescriptorResolutionResult resolutionResult ) + { + results.computeIfAbsent( ArtifactIdUtils.toId( artifact ), + key -> ConcurrentUtils.constantFuture( resolutionResult ) ); + } + + Future find( Artifact artifact ) + { + return results.get( ArtifactIdUtils.toId( artifact ) ); + } + + void shutdown() + { + executorService.shutdown(); + } + + private ExecutorService getExecutorService( RepositorySystemSession session ) + { + int nThreads = ConfigUtils.getInteger( session, 5, "maven.artifact.threads" ); + logger.debug( "Created thread pool with {} threads to resolve descriptors.", nThreads ); + return new ThreadPoolExecutor( nThreads, nThreads, 3L, TimeUnit.SECONDS, new LinkedBlockingQueue(), + new WorkerThreadFactory( getClass().getSimpleName() ) ); + } + } + + static class DescriptorResolutionResult + { + Artifact artifact; + + VersionRangeResult rangeResult; + + Map descriptors; + + DescriptorResolutionResult( Artifact artifact, VersionRangeResult rangeResult ) + { + this.artifact = artifact; + this.rangeResult = rangeResult; + this.descriptors = new LinkedHashMap<>( rangeResult.getVersions().size() ); + } + + DescriptorResolutionResult( VersionRangeResult rangeResult, + Version version, ArtifactDescriptorResult descriptor ) + { + this( descriptor.getArtifact(), rangeResult ); + this.descriptors.put( version, descriptor ); + } + + List flatten() + { + if ( descriptors.size() > 1 ) + { + return descriptors.entrySet().stream() + .map( e -> new DescriptorResolutionResult( rangeResult, e.getKey(), e.getValue() ) ) + .collect( Collectors.toList() ); + } + else + { + return Collections.emptyList(); + } + } + } + static class Args { @@ -386,9 +570,12 @@ static class Args final DependencyResolutionSkipper skipper; + final ParallelDescriptorResolver resolver; + Args( RepositorySystemSession session, DataPool pool, - DefaultDependencyCollectionContext collectionContext, DefaultVersionFilterContext versionContext, - CollectRequest request, DependencyResolutionSkipper skipper ) + DefaultDependencyCollectionContext collectionContext, DefaultVersionFilterContext versionContext, + CollectRequest request, DependencyResolutionSkipper skipper, + ParallelDescriptorResolver resolver ) { this.session = session; this.request = request; @@ -398,6 +585,7 @@ static class Args this.collectionContext = collectionContext; this.versionContext = versionContext; this.skipper = skipper; + this.resolver = resolver; } } diff --git a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/DependencyProcessingContext.java b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/DependencyProcessingContext.java index ee2e77204..508d2d04d 100644 --- a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/DependencyProcessingContext.java +++ b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/DependencyProcessingContext.java @@ -21,12 +21,14 @@ import java.util.List; +import org.eclipse.aether.RequestTrace; import org.eclipse.aether.collection.DependencyManager; import org.eclipse.aether.collection.DependencySelector; import org.eclipse.aether.collection.DependencyTraverser; import org.eclipse.aether.collection.VersionFilter; import org.eclipse.aether.graph.Dependency; import org.eclipse.aether.graph.DependencyNode; +import org.eclipse.aether.internal.impl.collect.PremanagedDependency; import org.eclipse.aether.repository.RemoteRepository; /** @@ -47,6 +49,8 @@ final class DependencyProcessingContext * All parents of the dependency in the top > down order. */ final List parents; + final PremanagedDependency premanagedDependency; + final RequestTrace trace; Dependency dependency; @SuppressWarnings( "checkstyle:parameternumber" ) @@ -54,17 +58,21 @@ final class DependencyProcessingContext DependencyManager depManager, DependencyTraverser depTraverser, VersionFilter verFilter, + RequestTrace trace, List repositories, List managedDependencies, List parents, - Dependency dependency ) + Dependency dependency, + PremanagedDependency premanagedDependency ) { this.depSelector = depSelector; this.depManager = depManager; this.depTraverser = depTraverser; this.verFilter = verFilter; + this.trace = trace; this.repositories = repositories; this.dependency = dependency; + this.premanagedDependency = premanagedDependency; this.managedDependencies = managedDependencies; this.parents = parents; } @@ -75,6 +83,13 @@ DependencyProcessingContext withDependency( Dependency dependency ) return this; } + DependencyProcessingContext copy() + { + return new DependencyProcessingContext( depSelector, depManager, depTraverser, + verFilter, trace, repositories, managedDependencies, parents, dependency, + premanagedDependency ); + } + DependencyNode getParent() { return parents.get( parents.size() - 1 );