Permalink
Browse files

Merge pull request #17 from iulukaya/AZK-123-JMX-hooks-Schedule-Unsch…

…edule

AZK-123: Add jmx hooks for Schedule-Unschedule commands
  • Loading branch information...
rbpark committed Sep 27, 2011
2 parents 35dea11 + 7d0f432 commit a7deeae3e96d49b204531990b8c9f74c14d2228f
@@ -35,6 +35,7 @@
import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
import org.joda.time.DateTimeZone;
+import azkaban.app.jmx.JobScheduler;
import azkaban.app.jmx.RefreshJobs;
import azkaban.common.jobs.Job;
import azkaban.common.utils.Props;
@@ -97,18 +98,23 @@
private final JobExecutorManager _jobExecutorManager;
private final ScheduleManager _schedulerManager;
+
private MBeanServer mbeanServer;
+ private ObjectName jobRefresherName;
+ private ObjectName jobSchedulerName;
- public AzkabanApplication(List<File> jobDirs, File logDir, File tempDir, boolean enableDevMode) throws IOException {
+ public AzkabanApplication(final List<File> jobDirs, final File logDir, final File tempDir, final boolean enableDevMode) throws IOException {
this._jobDirs = Utils.nonNull(jobDirs);
this._logsDir = Utils.nonNull(logDir);
this._tempDir = Utils.nonNull(tempDir);
- if(!this._logsDir.exists())
+ if(!this._logsDir.exists()) {
this._logsDir.mkdirs();
+ }
- if(!this._tempDir.exists())
+ if(!this._tempDir.exists()) {
this._tempDir.mkdirs();
+ }
for(File jobDir: _jobDirs) {
if(!jobDir.exists()) {
@@ -117,8 +123,9 @@ public AzkabanApplication(List<File> jobDirs, File logDir, File tempDir, boolean
}
}
- if(jobDirs.size() < 1)
+ if(jobDirs.size() < 1) {
throw new IllegalArgumentException("No job directory given.");
+ }
Props defaultProps = PropsUtils.loadPropsInDirs(_jobDirs, ".properties", ".schema");
@@ -168,7 +175,9 @@ public AzkabanApplication(List<File> jobDirs, File logDir, File tempDir, boolean
File executionsStorageDir = new File(
defaultProps.getString("azkaban.executions.storage.dir", initialJobDir.getAbsolutePath() + "/executions")
);
- if (! executionsStorageDir.exists()) executionsStorageDir.mkdirs();
+ if (! executionsStorageDir.exists()) {
+ executionsStorageDir.mkdirs();
+ }
long lastExecutionId = getLastExecutionId(executionsStorageDir);
logger.info(String.format("Using path[%s] for storing executions.", executionsStorageDir));
logger.info(String.format("Last known execution id was [%s]", lastExecutionId));
@@ -179,7 +188,7 @@ public AzkabanApplication(List<File> jobDirs, File logDir, File tempDir, boolean
FlowExecutionSerializer flowExecutionSerializer = new FlowExecutionSerializer(flowSerializer);
FlowExecutionDeserializer flowExecutionDeserializer = new FlowExecutionDeserializer(flowDeserializer);
- _monitor = (MonitorImpl)MonitorImpl.getMonitor();
+ _monitor = MonitorImpl.getMonitor();
_allFlows = new CachingFlowManager(
new RefreshableFlowManager(
@@ -208,18 +217,19 @@ public AzkabanApplication(List<File> jobDirs, File logDir, File tempDir, boolean
*/
String server_url = defaultProps.getString("server.url", null) ;
if (server_url != null) {
- if (server_url.endsWith("/"))
- _jobExecutorManager.setRuntimeProperty(AppCommon.DEFAULT_LOG_URL_PREFIX, server_url + "logs?file=" );
- else
- _jobExecutorManager.setRuntimeProperty(AppCommon.DEFAULT_LOG_URL_PREFIX, server_url + "/logs?file=" );
+ if (server_url.endsWith("/")) {
+ _jobExecutorManager.setRuntimeProperty(AppCommon.DEFAULT_LOG_URL_PREFIX, server_url + "logs?file=" );
+ } else {
+ _jobExecutorManager.setRuntimeProperty(AppCommon.DEFAULT_LOG_URL_PREFIX, server_url + "/logs?file=" );
+ }
}
this._velocityEngine = configureVelocityEngine(enableDevMode);
configureMBeanServer();
}
- private VelocityEngine configureVelocityEngine(boolean devMode) {
+ private VelocityEngine configureVelocityEngine(final boolean devMode) {
VelocityEngine engine = new VelocityEngine();
engine.setProperty("resource.loader", "classpath");
engine.setProperty("classpath.resource.loader.class",
@@ -252,15 +262,27 @@ private void configureMBeanServer() {
logger.info("Registering MBeans...");
mbeanServer = ManagementFactory.getPlatformMBeanServer();
try {
- ObjectName azkabanAppName = new ObjectName("azkaban.app.jmx.RefreshJobs:name=jobRefresher");
- mbeanServer.registerMBean(new RefreshJobs(this), azkabanAppName);
- logger.info("Bean " + azkabanAppName.getCanonicalName() + " registered.");
+ jobRefresherName = new ObjectName("azkaban.app.jmx.RefreshJobs:name=jobRefresher");
+ jobSchedulerName = new ObjectName("azkaban.app.jmx.jobScheduler:name=jobScheduler");
+ mbeanServer.registerMBean(new RefreshJobs(this), jobRefresherName);
+ logger.info("Bean " + jobRefresherName.getCanonicalName() + " registered.");
+ mbeanServer.registerMBean(new JobScheduler(_schedulerManager, _jobManager), jobSchedulerName);
+ logger.info("Bean " + jobSchedulerName.getCanonicalName() + " registered.");
}
catch(Exception e) {
logger.error("Failed to configure MBeanServer", e);
}
}
+ public void close() {
+ try {
+ mbeanServer.unregisterMBean(jobRefresherName);
+ mbeanServer.unregisterMBean(jobSchedulerName);
+ } catch (Exception e) {
+ logger.error("Failed to cleanup MBeanServer", e);
+ }
+ }
+
public String getLogDirectory() {
return _logsDir.getAbsolutePath();
}
@@ -333,7 +355,7 @@ private ClassLoader getBaseClassloader() throws MalformedURLException
return retVal;
}
- private NamedPermitManager getNamedPermitManager(Props props) throws MalformedURLException
+ private NamedPermitManager getNamedPermitManager(final Props props) throws MalformedURLException
{
int workPermits = props.getInt("total.job.permits", Integer.MAX_VALUE);
NamedPermitManager permitManager = new NamedPermitManager();
@@ -342,33 +364,35 @@ private NamedPermitManager getNamedPermitManager(Props props) throws MalformedUR
return permitManager;
}
- private File getBackupFile(Props defaultProps, File initialJobDir)
+ private File getBackupFile(final Props defaultProps, final File initialJobDir)
{
File retVal = new File(initialJobDir.getAbsoluteFile(), "jobs.schedule.backup");
String backupFile = defaultProps.getString("schedule.backup.file", null);
- if(backupFile != null)
+ if(backupFile != null) {
retVal = new File(backupFile);
- else
+ } else {
logger.info("Schedule backup file param not set. Defaulting to " + retVal.getAbsolutePath());
+ }
return retVal;
}
- private File getScheduleFile(Props defaultProps, File initialJobDir)
+ private File getScheduleFile(final Props defaultProps, final File initialJobDir)
{
File retVal = new File(initialJobDir.getAbsoluteFile(), "jobs.schedule");
String scheduleFile = defaultProps.getString("schedule.file", null);
- if(scheduleFile != null)
+ if(scheduleFile != null) {
retVal = new File(scheduleFile);
- else
+ } else {
logger.info("Schedule file param not set. Defaulting to " + retVal.getAbsolutePath());
+ }
return retVal;
}
- private long getLastExecutionId(File executionsStorageDir)
+ private long getLastExecutionId(final File executionsStorageDir)
{
long lastId = 0;
@@ -390,11 +414,11 @@ private long getLastExecutionId(File executionsStorageDir)
}
- public String getRuntimeProperty(String name) {
+ public String getRuntimeProperty(final String name) {
return _jobExecutorManager.getRuntimeProperty(name);
}
- public void setRuntimeProperty(String key, String value) {
+ public void setRuntimeProperty(final String key, final String value) {
_jobExecutorManager.setRuntimeProperty(key, value);
}
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2011 Adconion, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.app.jmx;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import javax.management.DescriptorKey;
+
+/**
+ * DisplayName - This annotation allows to supply
+ * a display name for a method in the MBean interface.
+ */
+@Documented
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface DisplayName {
+ @DescriptorKey("displayName")
+ String value();
+}
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2011 Adconion, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.app.jmx;
+
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.Days;
+import org.joda.time.Hours;
+import org.joda.time.IllegalFieldValueException;
+import org.joda.time.LocalDateTime;
+import org.joda.time.Minutes;
+import org.joda.time.ReadablePeriod;
+import org.joda.time.Seconds;
+import org.joda.time.format.DateTimeFormat;
+
+import azkaban.app.JobDescriptor;
+import azkaban.app.JobManager;
+import azkaban.scheduler.ScheduleManager;
+
+/**
+ * @author ibrahimulukaya
+ * Implements the JobSchedulerMBean
+ */
+public class JobScheduler implements JobSchedulerMBean {
+ private static Logger logger = Logger.getLogger(JobScheduler.class);
+ private ScheduleManager scheduler;
+ private JobManager jobManager;
+
+ public JobScheduler(ScheduleManager scheduler, JobManager jobManager) {
+ this.scheduler = scheduler;
+ this.jobManager = jobManager;
+ }
+
+ public String scheduleWorkflow(String jobName, boolean ignoreDeps,
+ int hour, int minutes, int seconds, String scheduledDate,
+ boolean isRecurring, int period, String periodUnits) {
+ String errorMsg = null;
+ if (jobName == null || jobName.trim().length() == 0) {
+ errorMsg = "You must select at least one job to run.";
+ logger.error(errorMsg);
+ return errorMsg;
+ }
+ JobDescriptor descriptor = jobManager.getJobDescriptor(jobName);
+ if (descriptor == null) {
+ errorMsg = "Job: '" + jobName + "' doesn't exist.";
+ logger.error(errorMsg);
+ return errorMsg;
+ }
+
+ DateTime day = null;
+ DateTime time = null;
+ try {
+ if (scheduledDate == null || scheduledDate.trim().length() == 0) {
+ day = new LocalDateTime().toDateTime();
+ time = day.withHourOfDay(hour).withMinuteOfHour(minutes)
+ .withSecondOfMinute(seconds);
+ if (day.isAfter(time)) {
+ time = time.plusDays(1);
+ }
+ } else {
+ try {
+ day = DateTimeFormat.forPattern("MM-dd-yyyy")
+ .parseDateTime(scheduledDate);
+ } catch (IllegalArgumentException e) {
+ logger.error(e);
+ return "Invalid date: '" + scheduledDate
+ + "', \"MM-dd-yyyy\" format is expected.";
+ }
+ time = day.withHourOfDay(hour).withMinuteOfHour(minutes)
+ .withSecondOfMinute(seconds);
+ }
+ } catch (IllegalFieldValueException e) {
+ logger.error(e);
+ return "Invalid schedule time (see logs): " + e.getMessage();
+
+ }
+ ReadablePeriod thePeriod = null;
+ if (isRecurring) {
+ if ("d".equals(periodUnits)) {
+ thePeriod = Days.days(period);
+ } else if ("h".equals(periodUnits)) {
+ thePeriod = Hours.hours(period);
+ } else if ("m".equals(periodUnits)) {
+ thePeriod = Minutes.minutes(period);
+ } else if ("s".equals(periodUnits)) {
+ thePeriod = Seconds.seconds(period);
+ } else {
+ errorMsg = "Unknown period unit: " + periodUnits;
+ logger.error(errorMsg);
+ return errorMsg;
+ }
+ }
+ try {
+ if (thePeriod == null) {
+ scheduler.schedule(jobName, time, ignoreDeps);
+ } else {
+ scheduler.schedule(jobName, time, thePeriod, ignoreDeps);
+ }
+ return "Schedule Successful!";
+ } catch (Exception e) {
+ logger.error(e);
+ return "Schedule Failed (see logs): " + e.getMessage();
+ }
+ }
+
+ public String removeScheduledWorkflow(String jobName) {
+ String errorMsg = null;
+ if (jobName == null || jobName.trim().length() == 0) {
+ errorMsg = "You must select at least one job to remove.";
+ logger.error(errorMsg);
+ return errorMsg;
+ }
+ if (scheduler.getSchedule(jobName) == null) {
+ errorMsg = "Job: '"+ jobName + "' doesn't exist in schedule.";
+ logger.error(errorMsg);
+ return errorMsg;
+ }
+ try {
+ scheduler.removeScheduledJob(jobName);
+ return "Job: '" + jobName + "' is successfully removed from " +
+ "schedule.";
+ } catch (Exception e) {
+ logger.error(e);
+ return "Removing From Schedule Failed (see logs): " +
+ e.getMessage();
+ }
+ }
+}
Oops, something went wrong.

0 comments on commit a7deeae

Please sign in to comment.