Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,8 @@ void reload()
* Stream of StoreKey instances present in the system.
*/
Stream<StoreKey> streamArtifactStoreKeys();

Set<StoreKey> getStoreKeysByPkg( String pkg );

Set<StoreKey> getStoreKeysByPkgAndType( final String pkg, final StoreType type );
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
import org.commonjava.indy.change.event.ArtifactStoreUpdateType;
import org.commonjava.indy.conf.InternalFeatureConfig;
import org.commonjava.indy.conf.SslValidationConfig;
import org.commonjava.indy.data.*;
import org.commonjava.indy.data.ArtifactStoreQuery;
import org.commonjava.indy.data.ArtifactStoreValidateData;
import org.commonjava.indy.data.IndyDataException;
import org.commonjava.indy.data.StoreDataManager;
import org.commonjava.indy.data.StoreEventDispatcher;
import org.commonjava.indy.data.StoreValidator;
import org.commonjava.indy.measure.annotation.Measure;
import org.commonjava.indy.model.core.ArtifactStore;
import org.commonjava.indy.model.core.HostedRepository;
Expand All @@ -32,14 +37,14 @@
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.net.MalformedURLException;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.commonjava.indy.model.core.StoreType.hosted;
Expand Down Expand Up @@ -404,4 +409,17 @@ public void disableNotValidStore(ArtifactStore store,ArtifactStoreValidateData v
// }
}

@Override
public Set<StoreKey> getStoreKeysByPkg( String pkg )
{
return streamArtifactStoreKeys().filter( key -> key.getPackageType().equals( pkg ) )
.collect( Collectors.toSet() );
}

@Override
public Set<StoreKey> getStoreKeysByPkgAndType( final String pkg, final StoreType type )
{
return streamArtifactStoreKeys().filter( key -> key.getPackageType().equals( pkg ) && key.getType() == type )
.collect( Collectors.toSet() );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.commonjava.indy.db.common;

import org.apache.commons.lang3.StringUtils;
import org.commonjava.indy.data.ArtifactStoreQuery;
import org.commonjava.indy.data.IndyDataException;
import org.commonjava.indy.data.StoreDataManager;
Expand Down Expand Up @@ -46,6 +47,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.commonjava.indy.model.core.StoreType.group;

/**
* This query interface is intended to be reusable across any {@link StoreDataManager} implementation. It contains logic
* for working with the {@link ArtifactStore}s contained in the StoreDataManager, but this logic is not tied directly to
Expand Down Expand Up @@ -122,7 +125,7 @@ else if ( HostedRepository.class.equals( storeCls ) )
}
else
{
this.types = Collections.singleton( StoreType.group );
this.types = Collections.singleton( group );
}

return (DefaultArtifactStoreQuery<C>) this;
Expand Down Expand Up @@ -414,8 +417,7 @@ public Set<Group> getGroupsAffectedBy( Collection<StoreKey> keys )

Set<StoreKey> processed = new HashSet<>();

Set<StoreKey> all = new DefaultArtifactStoreQuery<>( dataManager, toProcess.get( 0 ).getPackageType(), null,
Group.class ).keyStream().collect( Collectors.toSet() );
Set<StoreKey> all = dataManager.getStoreKeysByPkgAndType( toProcess.get( 0 ).getPackageType(), group );

logger.debug( "There are {} groups need to loop checking for affected by", all.size() );

Expand Down Expand Up @@ -470,12 +472,16 @@ public Stream<StoreKey> keyStream()

public Stream<StoreKey> keyStream( Predicate<StoreKey> filterPredicate )
{
return dataManager.streamArtifactStoreKeys().filter(key -> {
if ( packageType != null && !key.getPackageType().equals( packageType ))
{
return false;
}

final Stream<StoreKey> storeKeys;
if ( StringUtils.isNotBlank( this.packageType ) )
{
storeKeys = dataManager.getStoreKeysByPkg( this.packageType ).stream();
}
else
{
storeKeys = dataManager.streamArtifactStoreKeys();
}
return storeKeys.filter(key -> {
if ( types != null && !types.isEmpty() && !types.contains( key.getType() ) )
{
return false;
Expand Down Expand Up @@ -553,7 +559,7 @@ public HostedRepository getHostedRepository( final String name )
public Group getGroup( final String name )
throws IndyDataException
{
return (Group) dataManager.getArtifactStore( new StoreKey( packageType, StoreType.group, name ) );
return (Group) dataManager.getArtifactStore( new StoreKey( packageType, group, name ) );
}

@Override
Expand All @@ -572,7 +578,7 @@ private List<ArtifactStore> getGroupOrdering( final String groupName,
throw new IndyDataException( "packageType must be set on the query before calling this method!" );
}

final Group master = (Group) dataManager.getArtifactStore( new StoreKey( packageType, StoreType.group, groupName ) );
final Group master = (Group) dataManager.getArtifactStore( new StoreKey( packageType, group, groupName ) );
if ( master == null )
{
return Collections.emptyList();
Expand Down Expand Up @@ -617,7 +623,7 @@ private void recurseGroup( final Group master,
final StoreType type = key.getType();
try
{
if ( recurseGroups && type == StoreType.group )
if ( recurseGroups && type == group )
{
// if we're here, we're definitely recursing groups...
Group group = (Group) dataManager.getArtifactStore( key );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.commonjava.indy.measure.annotation.Measure;
import org.commonjava.indy.model.core.ArtifactStore;
import org.commonjava.indy.model.core.StoreKey;
import org.commonjava.indy.model.core.StoreType;
import org.commonjava.indy.subsys.infinispan.CacheHandle;
import org.infinispan.Cache;
import org.slf4j.Logger;
Expand All @@ -31,12 +32,14 @@
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Alternative;
import javax.inject.Inject;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;

import static org.commonjava.indy.infinispan.data.StoreDataCacheProducer.STORE_BY_PKG_CACHE;
import static org.commonjava.indy.infinispan.data.StoreDataCacheProducer.STORE_DATA_CACHE;

@ApplicationScoped
Expand All @@ -50,6 +53,10 @@ public class InfinispanStoreDataManager
@StoreDataCache
private CacheHandle<StoreKey, ArtifactStore> stores;

@Inject
@StoreByPkgCache
private CacheHandle<String, Map<StoreType, Set<StoreKey>>> storesByPkg;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can really only work if it's never persisted or clustered. That might cause problems if we try to deploy in a multi-node scenario.

Otherwise, I think the write latency on this cache combined with the synchronization of all store writes will crush performance. I could be misreading that, maybe I'm missing something.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently this cache is a pure in-memory cache, whose content is built up from the store cache during each startup. I agree that this can not work well in multi-node scenario. If we want this to work in distributed world, persistence or distributed cache is always needed.


@Inject
private StoreEventDispatcher dispatcher;

Expand All @@ -65,14 +72,31 @@ protected InfinispanStoreDataManager()
}

@PostConstruct
private void init()
synchronized void init()
{
// re-fill the stores by package cache each time when reboot
if ( storesByPkg != null )
{
logger.info( "Clean the stores-by-pkg cache" );
storesByPkg.clear();
}
final Set<ArtifactStore> allStores = getAllArtifactStores();
logger.info( "There are {} stores need to fill in stores-by-pkg cache", allStores.size() );
for ( ArtifactStore store : allStores )
{
final Map<StoreType, Set<StoreKey>> typedKeys =
storesByPkg.computeIfAbsent( store.getKey().getPackageType(), k -> new HashMap<>() );
final Set<StoreKey> keys = typedKeys.computeIfAbsent( store.getKey().getType(), k -> new HashSet<>() );
keys.add( store.getKey() );
}
}

public InfinispanStoreDataManager( final Cache<StoreKey, ArtifactStore> cache )
public InfinispanStoreDataManager( final Cache<StoreKey, ArtifactStore> cache,
final Cache<String, Map<StoreType, Set<StoreKey>>> storesByPkg )
{
this.dispatcher = new NoOpStoreEventDispatcher();
this.stores = new CacheHandle( STORE_DATA_CACHE, cache );
this.storesByPkg = new CacheHandle( STORE_BY_PKG_CACHE, storesByPkg );
}

@Override
Expand All @@ -81,17 +105,27 @@ protected ArtifactStore getArtifactStoreInternal( StoreKey key )
return stores.get( key );
}


@Override
protected ArtifactStore removeArtifactStoreInternal( StoreKey key )
protected synchronized ArtifactStore removeArtifactStoreInternal( StoreKey key )
{
return stores.remove( key );
final ArtifactStore store = stores.remove( key );
final Map<StoreType, Set<StoreKey>> typedKeys = storesByPkg.get( key.getPackageType() );
if ( typedKeys != null )
{
final Set<StoreKey> keys = typedKeys.get( key.getType() );
if ( keys != null )
{
keys.remove( key );
}
}
return store;
}

@Override
public void clear( final ChangeSummary summary )
{
stores.clear();
storesByPkg.clear();
}

@Override
Expand Down Expand Up @@ -139,9 +173,49 @@ public Stream<StoreKey> streamArtifactStoreKeys()
}

@Override
protected ArtifactStore putArtifactStoreInternal( StoreKey storeKey, ArtifactStore store )
protected synchronized ArtifactStore putArtifactStoreInternal( StoreKey storeKey, ArtifactStore store )
{
final ArtifactStore added = stores.put( storeKey, store );
final Map<StoreType, Set<StoreKey>> typedKeys =
storesByPkg.computeIfAbsent( storeKey.getPackageType(), k -> new HashMap<>() );
final Set<StoreKey> keys = typedKeys.computeIfAbsent( storeKey.getType(), k -> new HashSet<>() );
Comment thread
jdcasey marked this conversation as resolved.
keys.add( storeKey );
return added;
}

@Override
public Set<StoreKey> getStoreKeysByPkg( final String pkg )
{
final Map<StoreType, Set<StoreKey>> typedKeys = storesByPkg.get( pkg );
if ( typedKeys != null )
{
final Set<StoreKey> keys = new HashSet<>();
typedKeys.values().forEach( keys::addAll );
logger.trace( "There are {} stores for package type {}", keys.size(), pkg );
return keys;
}
else
{
logger.trace( "There is no store for package type {}", pkg );
return Collections.emptySet();
}
}

@Override
public Set<StoreKey> getStoreKeysByPkgAndType( final String pkg, final StoreType type )
{
return stores.put( storeKey, store );
final Map<StoreType, Set<StoreKey>> typedKeys = storesByPkg.get( pkg );
if ( typedKeys != null )
{
final Set<StoreKey> keys = typedKeys.get( type );
if ( keys != null )
{
logger.trace( "There are {} stores for package type {} with type {}", keys.size(), pkg, type );
return new HashSet<>( keys );
}
}
logger.trace( "There is no store for package type {} with type {}", pkg, type );
return Collections.emptySet();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.commonjava.indy.infinispan.data;

import javax.inject.Qualifier;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Qualifier
@Target( { ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention( RetentionPolicy.RUNTIME)
@Documented
public @interface StoreByPkgCache
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,22 @@

import org.commonjava.indy.model.core.ArtifactStore;
import org.commonjava.indy.model.core.StoreKey;
import org.commonjava.indy.model.core.StoreType;
import org.commonjava.indy.subsys.infinispan.CacheHandle;
import org.commonjava.indy.subsys.infinispan.CacheProducer;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;
import java.util.Map;
import java.util.Set;

public class StoreDataCacheProducer
{
public static final String STORE_DATA_CACHE = "store-data-v2";

public static final String STORE_BY_PKG_CACHE = "store-by-package";

@Inject
private CacheProducer cacheProducer;

Expand All @@ -47,5 +52,12 @@ public CacheHandle<StoreKey, ArtifactStore> getStoreDataCache()
// return cacheProducer.getCache( STORE_DATA_CACHE );
// }

@StoreByPkgCache
@Produces
@ApplicationScoped
public CacheHandle<String, Map<StoreType, Set<StoreKey>>> getStoreByPkgCache()
{
return cacheProducer.getCache( STORE_BY_PKG_CACHE );
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@
import org.commonjava.indy.data.StoreDataManager;
import org.commonjava.indy.model.core.ArtifactStore;
import org.commonjava.indy.model.core.StoreKey;
import org.commonjava.indy.model.core.StoreType;
import org.infinispan.Cache;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.manager.DefaultCacheManager;

import java.util.Map;
import java.util.Set;

import static org.commonjava.indy.infinispan.data.StoreDataCacheProducer.STORE_BY_PKG_CACHE;
import static org.commonjava.indy.infinispan.data.StoreDataCacheProducer.STORE_DATA_CACHE;

public class InfinispanTCKFixtureProvider
Expand All @@ -35,7 +40,9 @@ protected void init()
DefaultCacheManager cacheManager =
new DefaultCacheManager( new ConfigurationBuilder().simpleCache( true ).build() );
Cache<StoreKey, ArtifactStore> storeCache = cacheManager.getCache( STORE_DATA_CACHE, true );
dataManager = new InfinispanStoreDataManager( storeCache );
Cache<String, Map<StoreType, Set<StoreKey>>> storesByPkgCache = cacheManager.getCache( STORE_BY_PKG_CACHE, true );
dataManager = new InfinispanStoreDataManager( storeCache, storesByPkgCache );
dataManager.init();
}

@Override
Expand Down
12 changes: 11 additions & 1 deletion subsys/infinispan/src/main/resources/infinispan.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,22 @@
</persistence>
</local-cache>

<local-cache name="store-data" configuration="local-template">
<local-cache name="store-data-v2" configuration="local-template">
<persistence passivation="true">
<file-store shared="false" preload="true" fetch-state="false" path="${indy.data}/store-v2"/>
</persistence>
</local-cache>

<local-cache name="store-by-package" configuration="local-template">
<memory>
<object size="100" />
</memory>
<indexing index="LOCAL">
<property name="default.indexmanager">near-real-time</property>
<property name="default.directory_provider">local-heap</property>
</indexing>
</local-cache>

<!--
A clustered lock is a lock which is distributed and shared among all nodes in the Infinispan cluster and
provides a way to execute code that will be synchronized between the nodes. Since 9.x.
Expand Down