Permalink
Browse files

MODE-1818 Repository configuration can now dictate garbage collection…

… options.

The garbage collection options can now be specified in the repository
configuration. Each repository registers its own garbage collection
processes with a named thread pool, and these processes are stopped
when the repository is shut down.
  • Loading branch information...
rhauch committed Feb 18, 2013
1 parent 788b580 commit 38505e44e81b91cb0a7210d83a838d4949e68bc8
@@ -59,6 +59,36 @@ public static WebArchive createWarDeployment() {
return archive;
}
+ /**
+ * Creates the following deployable:
+ *
+ * cdi-test-ear.ear
+ * /lib/cdi-ear-producer.jar
+ * /META-INF/beans.xml
+ * /cdi-ear-test.war
+ * /WEB-INF/beans.xml
+ * /META-INF/MANIFEST.MF
+ * @return the archive
+ */
+ @Deployment(name = "cdi-test-ear")
+ public static EnterpriseArchive createEarDeployment() {
+
+ JavaArchive providerLib = ShrinkWrap.create(JavaArchive.class, "cdi-ear-producer.jar")
+ .addAsManifestResource(EmptyAsset.INSTANCE, ArchivePaths.create(
+ "beans.xml"))
+ .addClass(CDIRepositoryProvider.class);
+
+ WebArchive webapp = ShrinkWrap.create(WebArchive.class, "cdi-ear-test.war")
+ .addAsWebInfResource(EmptyAsset.INSTANCE, ArchivePaths.create("beans.xml"))
+ .addClass(CDITest.class)
+ .addClass(CDIRepositoryConsumer.class);
+
+ return ShrinkWrap.create(EnterpriseArchive.class, "cdi-ear-test.ear")
+ .addAsModule(webapp)
+ .addAsLibraries(providerLib)
+ .addAsManifestResource(new File("src/main/webapp/META-INF/MANIFEST.MF"));
+ }
+
@Inject
private CDIRepositoryConsumer consumer;
@@ -24,6 +24,7 @@
package org.modeshape.common.util;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
@@ -57,7 +58,7 @@
* @param name the name of the thread pool; may not be null
* @return the thread pool executor; never null
*/
- ExecutorService getScheduledThreadPool( String name );
+ ScheduledExecutorService getScheduledThreadPool( String name );
/**
* Performs a {@link java.util.concurrent.ExecutorService#shutdownNow()} on the given pool, if the pool has been created
@@ -29,6 +29,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.modeshape.common.annotation.ThreadSafe;
@@ -54,9 +55,10 @@ public ExecutorService getCachedTreadPool( String name ) {
}
@Override
- public ExecutorService getScheduledThreadPool( String name ) {
- return getOrCreateNewPool(name,
- Executors.newScheduledThreadPool(DEFAULT_SCHEDULED_THREAD_COUNT, new NamedThreadFactory(name)));
+ public ScheduledExecutorService getScheduledThreadPool( String name ) {
+ return (ScheduledExecutorService)getOrCreateNewPool(name,
+ Executors.newScheduledThreadPool(DEFAULT_SCHEDULED_THREAD_COUNT,
+ new NamedThreadFactory(name)));
}
private ExecutorService getOrCreateNewPool( String name,
@@ -32,6 +32,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.modeshape.common.SystemFailureException;
import org.modeshape.common.annotation.Immutable;
@@ -418,7 +419,7 @@ public ExecutorService getCachedTreadPool( String name ) {
}
@Override
- public ExecutorService getScheduledThreadPool( String name ) {
+ public ScheduledExecutorService getScheduledThreadPool( String name ) {
return this.threadPools.getScheduledThreadPool(name);
}
@@ -198,6 +198,8 @@
public static I18n errorDuringInitialImport;
public static I18n nodeTypesNotFoundInXml;
+ public static I18n invalidGarbageCollectionInitialTime;
+
public static I18n failedToQueryForDerivedContent;
public static I18n systemSourceNameOptionValueDoesNotReferenceExistingSource;
@@ -461,7 +463,7 @@
public static I18n invalidProjectionPath;
public static I18n invalidProjectionExpression;
public static I18n projectedPathPointsTowardsInternalNode;
-
+
public static I18n reindexMissingNoIndexesExist;
public static I18n noReindex;
public static I18n reindexAll;
@@ -44,13 +44,15 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Matcher;
import javax.jcr.AccessDeniedException;
import javax.jcr.Credentials;
import javax.jcr.LoginException;
@@ -87,6 +89,7 @@
import org.modeshape.jcr.RepositoryConfiguration.BinaryStorage;
import org.modeshape.jcr.RepositoryConfiguration.Component;
import org.modeshape.jcr.RepositoryConfiguration.FieldName;
+import org.modeshape.jcr.RepositoryConfiguration.GarbageCollection;
import org.modeshape.jcr.RepositoryConfiguration.JaasSecurity;
import org.modeshape.jcr.RepositoryConfiguration.QuerySystem;
import org.modeshape.jcr.RepositoryConfiguration.Security;
@@ -98,6 +101,7 @@
import org.modeshape.jcr.api.Workspace;
import org.modeshape.jcr.api.monitor.ValueMetric;
import org.modeshape.jcr.api.query.Query;
+import org.modeshape.jcr.api.value.DateTime;
import org.modeshape.jcr.bus.ChangeBus;
import org.modeshape.jcr.bus.ClusteredRepositoryChangeBus;
import org.modeshape.jcr.bus.RepositoryChangeBus;
@@ -134,6 +138,7 @@
import org.modeshape.jcr.txn.NoClientTransactions;
import org.modeshape.jcr.txn.SynchronizedTransactions;
import org.modeshape.jcr.txn.Transactions;
+import org.modeshape.jcr.value.DateTimeFactory;
import org.modeshape.jcr.value.Name;
import org.modeshape.jcr.value.NamespaceRegistry;
import org.modeshape.jcr.value.Property;
@@ -868,19 +873,6 @@ private void repositoryNameChanged() {
descriptors.put(Repository.REPOSITORY_NAME, repositoryName());
}
- /**
- * Clean up the repository content's garbage.
- *
- * @see ModeShapeEngine.GarbageCollectionTask#run()
- */
- void cleanUp() {
- RunningState running = runningState.get();
- if (running != null) {
- running.cleanUpLocks();
- running.cleanUpBinaryValues();
- }
- }
-
Collection<Cache<?, ?>> caches() {
RunningState running = runningState.get();
if (running == null) return Collections.emptyList();
@@ -960,6 +952,7 @@ public void notify( ChangeSet changeSet ) {
private final NodeTypesImporter nodeTypesImporter;
private final Connectors connectors;
private final RepositoryConfiguration.IndexRebuildOptions indexRebuildOptions;
+ private final List<ScheduledFuture<?>> gcProcesses = new ArrayList<ScheduledFuture<?>>();
private Transaction runningTransaction;
@@ -987,7 +980,7 @@ protected RunningState( JcrRepository.RunningState other,
this.statistics = other != null ? other.statistics : new RepositoryStatistics(tempContext);
if (this.config.getMonitoring().enabled()) {
// Start the Cron service, with a minimum of a single thread ...
- this.statsRollupService = (ScheduledExecutorService)tempContext.getScheduledThreadPool("modeshape-stats");
+ this.statsRollupService = tempContext.getScheduledThreadPool("modeshape-stats");
this.statistics.start(this.statsRollupService);
} else {
this.statsRollupService = null;
@@ -1309,6 +1302,25 @@ public Void call() throws Exception {
// atomic
this.transactions.resume(runningTransaction);
this.runningTransaction = null;
+
+ // Register the background processes ...
+ GarbageCollection gcConfig = config.getGarbageCollection();
+ String threadPoolName = gcConfig.getThreadPoolName();
+ ScheduledExecutorService garbageCollectionService = this.context.getScheduledThreadPool(threadPoolName);
+ long binaryGcInitialTime = determineInitialDelay(gcConfig.getInitialTimeExpression());
+ long binaryGcInterval = gcConfig.getIntervalInHours();
+ gcProcesses.add(garbageCollectionService.scheduleAtFixedRate(new LockGarbageCollectionTask(),
+ RepositoryConfiguration.LOCK_GARBAGE_COLLECTION_SWEEP_PERIOD,
+ RepositoryConfiguration.LOCK_GARBAGE_COLLECTION_SWEEP_PERIOD,
+ RepositoryConfiguration.LOCK_GARBAGE_COLLECTION_SWEEP_PERIOD_UNIT));
+ if (binaryGcInitialTime >= 0) {
+ gcProcesses.add(garbageCollectionService.scheduleAtFixedRate(new BinaryValueGarbageCollectionTask(),
+ binaryGcInitialTime,
+ binaryGcInterval,
+ TimeUnit.HOURS));
+ } else {
+ logger.warn(JcrI18n.invalidGarbageCollectionInitialTime, repositoryName(), gcConfig.getInitialTimeExpression());
+ }
}
protected final Sequencers sequencers() {
@@ -1506,6 +1518,11 @@ protected void shutdown() {
// shutdown the connectors
this.connectors.shutdown();
+ // Remove the scheduled operations ...
+ for (ScheduledFuture<?> future : gcProcesses) {
+ future.cancel(true);
+ }
+
// Unregister from JNDI ...
unbindFromJndi();
@@ -1645,7 +1662,7 @@ void terminateSessions() {
}
/**
- * @see JcrRepository#cleanUp()
+ * @see LockGarbageCollectionTask
*/
void cleanUpLocks() {
if (logger.isDebugEnabled()) {
@@ -1682,7 +1699,7 @@ void cleanUpLocks() {
}
/**
- * @see JcrRepository#cleanUp()
+ * @see BinaryValueGarbageCollectionTask
*/
void cleanUpBinaryValues() {
if (logger.isDebugEnabled()) {
@@ -1889,4 +1906,38 @@ public void logout() {
}
}
+ protected class BinaryValueGarbageCollectionTask implements Runnable {
+ @Override
+ public void run() {
+ runningState().cleanUpBinaryValues();
+ }
+ }
+
+ protected class LockGarbageCollectionTask implements Runnable {
+ @Override
+ public void run() {
+ runningState().cleanUpLocks();
+ }
+ }
+
+ protected long determineInitialDelay( String initialTimeExpression ) {
+ Matcher matcher = RepositoryConfiguration.INITIAL_TIME_PATTERN.matcher(initialTimeExpression);
+ if (matcher.matches()) {
+ int hours = Integer.decode(matcher.group(1));
+ int mins = Integer.decode(matcher.group(2));
+ DateTimeFactory factory = runningState().context().getValueFactories().getDateFactory();
+ DateTime now = factory.create();
+ DateTime initialTime = factory.create(now.getYear(), now.getMonthOfYear(), now.getDayOfMonth(), hours, mins, 0, 0);
+ long delay = initialTime.getMilliseconds() - System.currentTimeMillis();
+ if (delay <= 0L) {
+ initialTime = initialTime.plusDays(1);
+ delay = initialTime.getMilliseconds() - System.currentTimeMillis();
+ }
+ assert delay >= 0;
+ return delay;
+ }
+ assert false;
+ return -1;
+ }
+
}
@@ -114,10 +114,6 @@ public void start() {
ThreadFactory cronThreadFactory = new NamedThreadFactory("modeshape-cron");
cronStarterService = new ScheduledThreadPoolExecutor(1, cronThreadFactory);
- // Add a Cron job that cleans up each repository ...
- cronStarterService.scheduleAtFixedRate(new GarbageCollectionTask(), RepositoryConfiguration.GARBAGE_COLLECTION_SWEEP_PERIOD,
- RepositoryConfiguration.GARBAGE_COLLECTION_SWEEP_PERIOD, TimeUnit.MILLISECONDS);
-
state = State.RUNNING;
} catch (RuntimeException e) {
state = State.NOT_RUNNING;
@@ -127,16 +123,6 @@ public void start() {
}
}
- protected class GarbageCollectionTask implements Runnable {
- @Override
- public void run() {
- for (JcrRepository repository : repositories()) {
- // Okay to call, even if not running ...
- repository.cleanUp();
- }
- }
- }
-
/**
* Shutdown this engine to stop all repositories, terminate any ongoing background operations (such as sequencing), and
* reclaim any resources that were acquired by this engine. This method may be called multiple times, but only the first time
@@ -613,7 +599,7 @@ protected JcrRepository deploy( final RepositoryConfiguration repositoryConfigur
} catch (Exception e) {
throw new ConfigurationException(problems, JcrI18n.repositoryConfigurationIsNotValid.text(repositoryName,
e.getMessage()), e);
- }
+ }
return new ImmediateFuture<JcrRepository>(repository);
//
// // Create an initializer that will start the repository ...
Oops, something went wrong.

0 comments on commit 38505e4

Please sign in to comment.