Skip to content

Commit

Permalink
Various refactorings in support of [IMMUTANT-143]
Browse files Browse the repository at this point in the history
Including:

* JobScheduler is now an MSC service instead of being created ad hoc
* application initialization now happens asynchronously, but in an MSC
  service so it will still complete before the deployment is
  considered complete
  • Loading branch information
tobias committed Apr 17, 2013
1 parent 3e71d2f commit 1877b94
Show file tree
Hide file tree
Showing 14 changed files with 274 additions and 150 deletions.
@@ -0,0 +1,73 @@
/*
* Copyright 2008-2013 Red Hat, Inc, and individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/

package org.immutant.core;

import org.immutant.runtime.ClojureRuntime;
import org.jboss.msc.inject.Injector;
import org.jboss.msc.service.StartContext;
import org.jboss.msc.service.StopContext;
import org.jboss.msc.value.InjectedValue;
import org.projectodd.polyglot.core.AsyncService;

public class ApplicationInitializer<Void> extends AsyncService<Void> {

public ApplicationInitializer(String fullAppConfig,
String leinProject,
ClojureMetaData metaData) {
this.fullAppConfig = fullAppConfig;
this.leinProject = leinProject;
this.metaData = metaData;
}

@Override
public void startAsync(StartContext context) throws Exception {
ClojureRuntime runtime = this.clojureRuntimeInjector.getValue();
Timer t = new Timer("setting app config");
runtime.invoke("immutant.runtime/set-app-config",
this.fullAppConfig,
this.leinProject);
t.done();
t = new Timer("initializing app");
runtime.invoke("immutant.runtime/initialize",
this.metaData.getInitFunction(),
this.metaData.getConfig());
t.done();
}

@Override
public Void getValue() {
return null;
}

@Override
public synchronized void stop(StopContext context) {
}

public Injector<ClojureRuntime> getClojureRuntimeInjector() {
return this.clojureRuntimeInjector;
}

private String fullAppConfig;
private String leinProject;
private ClojureMetaData metaData;

private InjectedValue<ClojureRuntime> clojureRuntimeInjector = new InjectedValue<ClojureRuntime>();

}
Expand Up @@ -57,6 +57,10 @@ public static ServiceName housekeeper(DeploymentUnit unit) {
public static ServiceName runtime(DeploymentUnit unit) {
return unit.getServiceName().append( CORE ).append( "clojure-runtime" );
}

public static ServiceName appInitializer(DeploymentUnit unit) {
return unit.getServiceName().append( CORE ).append( "application-initializer" );
}

public static ServiceName tmpResourceMounter(DeploymentUnit unit) {
return unit.getServiceName().append( "resource-mounter" );
Expand Down
Expand Up @@ -34,7 +34,7 @@
import org.immutant.core.processors.AppDependenciesProcessor;
import org.immutant.core.processors.AppNameRegisteringProcessor;
import org.immutant.core.processors.AppRootRegisteringProcessor;
import org.immutant.core.processors.ApplicationInitializer;
import org.immutant.core.processors.ApplicationInitializerInstaller;
import org.immutant.core.processors.ArchiveRecognizer;
import org.immutant.core.processors.ClojureRuntimeInstaller;
import org.immutant.core.processors.CloserInstaller;
Expand Down Expand Up @@ -107,7 +107,7 @@ protected void addDeploymentProcessors(final DeploymentProcessorTarget processor
processorTarget.addDeploymentProcessor( CoreExtension.SUBSYSTEM_NAME, Phase.POST_MODULE, 141, new AppRootRegisteringProcessor() );
processorTarget.addDeploymentProcessor( CoreExtension.SUBSYSTEM_NAME, Phase.POST_MODULE, 142, new TmpResourceMounterRegisteringInstaller() );

processorTarget.addDeploymentProcessor( CoreExtension.SUBSYSTEM_NAME, Phase.INSTALL, 10000, new ApplicationInitializer() );
processorTarget.addDeploymentProcessor( CoreExtension.SUBSYSTEM_NAME, Phase.INSTALL, 10000, new ApplicationInitializerInstaller() );
}

@SuppressWarnings("serial")
Expand Down

This file was deleted.

@@ -0,0 +1,69 @@
/*
* Copyright 2008-2013 Red Hat, Inc, and individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/

package org.immutant.core.processors;

import org.immutant.core.ApplicationInitializer;
import org.immutant.core.ClojureMetaData;
import org.immutant.core.as.CoreServices;
import org.jboss.as.server.deployment.DeploymentPhaseContext;
import org.jboss.as.server.deployment.DeploymentUnit;
import org.jboss.as.server.deployment.DeploymentUnitProcessingException;
import org.jboss.as.server.deployment.DeploymentUnitProcessor;
import org.jboss.logging.Logger;
import org.jboss.msc.service.ServiceController.Mode;

public class ApplicationInitializerInstaller implements DeploymentUnitProcessor {

public ApplicationInitializerInstaller() {
}

/**
* @param phaseContext
*
* @throws DeploymentUnitProcessingException
*/
@Override
public void deploy(DeploymentPhaseContext phaseContext) throws DeploymentUnitProcessingException {
DeploymentUnit unit = phaseContext.getDeploymentUnit();
ClojureMetaData metaData = unit.getAttachment(ClojureMetaData.ATTACHMENT_KEY);

if (metaData == null) {
return;
}

ApplicationInitializer service =
new ApplicationInitializer(unit.getAttachment(ClojureMetaData.FULL_APP_CONFIG),
unit.getAttachment(ClojureMetaData.LEIN_PROJECT),
metaData);

phaseContext.getServiceTarget()
.addService(CoreServices.appInitializer(unit), service)
.addDependency(CoreServices.runtime(unit), service.getClojureRuntimeInjector())
.setInitialMode(Mode.ACTIVE)
.install();
}

@Override
public void undeploy(DeploymentUnit context) {

}

static final Logger log = Logger.getLogger( "org.immutant.core" );
}
4 changes: 2 additions & 2 deletions modules/jobs/src/main/clojure/immutant/jobs.clj
Expand Up @@ -74,5 +74,5 @@ will replace that job."}

(defn internal-scheduler
"Returns the internal Quartz scheduler for use with other libs, e.g. Quartzite"
[& {:keys [singleton] :or {singleton true}}]
(internal/quartz-scheduler singleton))
[]
(internal/quartz-scheduler))
48 changes: 21 additions & 27 deletions modules/jobs/src/main/clojure/immutant/jobs/internal.clj
Expand Up @@ -21,41 +21,36 @@
[clojure.tools.logging :as log])
(:import java.util.Date))

(defn ^:internal job-schedulizer []
(registry/get "job-schedulizer"))

(defn ^{:internal true} create-scheduler
"Creates a scheduler for the current application."
[]
(log/info "Creating job scheduler for" (app-name))
(.createScheduler (job-schedulizer)))

(defn ^{:internal true} scheduler
"Retrieves the appropriate scheduler, creating it if necessary"
[]
(let [name "job-scheduler"]
(if-let [scheduler (registry/get name)]
scheduler
(registry/put name (create-scheduler)))))

(defn ^{:private true} wait-for-scheduler
"Waits for the scheduler to start before invoking f"
([scheduler f]
(wait-for-scheduler scheduler f 30))
([scheduler f attempts]
(defn ^{:private true} wait-for
"Waits for the item to start before invoking f"
([item]
(wait-for item (constantly item)))
([item f]
(wait-for item f 300))
([item f attempts]
(cond
(.isStarted scheduler) (f)
(.isStarted item) (f)
(< attempts 0) (throw (IllegalStateException.
(str "Gave up waiting for " (.getName scheduler) " to start")))
(str "Gave up waiting for " (.getName item) " to start")))
:default (do
(log/debug "Waiting for scheduler" (.getName scheduler) "to start")
(Thread/sleep 1000)
(recur scheduler f (dec attempts))))))
(log/debug "Waiting for" (.getName item) "to start")
(Thread/sleep 100)
(recur item f (dec attempts))))))

(defn ^:internal job-schedulizer []
(wait-for (registry/get "job-schedulizer")))

(defn ^{:internal true} scheduler
"Retrieves the appropriate scheduler, starting it if necessary"
[]
(wait-for (.activateScheduler (job-schedulizer))))

(defn ^:internal quartz-scheduler
"Returns the internal quartz scheduler"
[]
(wait-for-scheduler (scheduler) #(.getScheduler (scheduler))))
(.getScheduler (scheduler)))

(defn ^:internal date
"A wrapper around Date. to facilitate testing"
Expand Down Expand Up @@ -94,7 +89,6 @@
(defn ^{:internal true} create-job
"Instantiates and starts a job"
[f name spec singleton]
(scheduler) ;; creates the scheduler if necessary
((if (map? spec)
create-at-job
create-scheduled-job) f name spec singleton))
Expand Down
42 changes: 29 additions & 13 deletions modules/jobs/src/main/java/org/immutant/jobs/JobSchedulizer.java
Expand Up @@ -21,16 +21,21 @@

import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;

import org.immutant.core.HasImmutantRuntimeInjector;
import org.immutant.core.as.CoreServices;
import org.immutant.jobs.as.JobsServices;
import org.jboss.as.server.deployment.DeploymentUnit;
import org.jboss.logging.Logger;
import org.jboss.msc.service.ServiceBuilder;
import org.jboss.msc.service.ServiceController;
import org.jboss.msc.service.ServiceController.Mode;
import org.jboss.msc.service.ServiceName;
import org.projectodd.polyglot.core.AtRuntimeInstaller;
import org.projectodd.polyglot.hasingleton.CoordinationMapService;
import org.projectodd.polyglot.hasingleton.HASingletonInstaller;
import org.projectodd.polyglot.jobs.BaseAtJob;
import org.projectodd.polyglot.jobs.BaseJob;


Expand All @@ -40,14 +45,15 @@ public JobSchedulizer(DeploymentUnit unit) {
super( unit );
}

public JobScheduler createScheduler() {
String name = "JobScheduler$" + getUnit().getName();
JobScheduler scheduler = new JobScheduler( name );
ServiceName serviceName = JobsServices.scheduler( getUnit() );

deploy( serviceName, scheduler, false );

return scheduler;
@SuppressWarnings("rawtypes")
public synchronized JobScheduler activateScheduler() {
ServiceController controller =
getUnit().getServiceRegistry().getService(JobsServices.scheduler(getUnit()));
if (controller.getMode() != Mode.ACTIVE) {
controller.setMode(Mode.ACTIVE);
}

return (JobScheduler)controller.getValue();
}

@SuppressWarnings("rawtypes")
Expand Down Expand Up @@ -80,6 +86,7 @@ public AtJob createAtJob(Callable handler, String name, Date startAt, Date endAt
}

private void installJob(final BaseJob job) {
activateScheduler();
final ServiceName serviceName = JobsServices.job( getUnit(), job.getName() );

replaceService( serviceName,
Expand All @@ -89,11 +96,20 @@ public void run() {
String haName = "job-" + job.getName();
HASingletonInstaller.deployOnce(getUnit(), getTarget(), haName);

ServiceBuilder builder = build(serviceName, job, true, haName);

builder.addDependency( CoreServices.runtime( getUnit() ), ((HasImmutantRuntimeInjector)job).getClojureRuntimeInjector() )
.addDependency( JobsServices.scheduler( getUnit() ), job.getJobSchedulerInjector() )
.install();
ServiceBuilder builder = build(serviceName, job, job.isSingleton(), haName);

builder.addDependency(CoreServices.runtime(getUnit()),
((HasImmutantRuntimeInjector)job).getClojureRuntimeInjector())
.addDependency(JobsServices.scheduler(getUnit()), job.getJobSchedulerInjector());

if (inCluster()
&& job instanceof BaseAtJob
&& job.isSingleton()) {
builder.addDependency(CoordinationMapService.serviceName(getUnit()),
ConcurrentMap.class,
((BaseAtJob)job).getCoordinationMapInjector());
}
builder.install();

}
} );
Expand Down

0 comments on commit 1877b94

Please sign in to comment.