Skip to content

Commit

Permalink
Directly use Quartz replase api to bootstrap a schedule (#15781)
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Apr 2, 2024
1 parent 920ac15 commit 0419543
Show file tree
Hide file tree
Showing 15 changed files with 535 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

public class SchedulerException extends RuntimeException {

public SchedulerException(String message) {
super(message);
public SchedulerException(SchedulerExceptionEnum schedulerExceptionEnum) {
super("Scheduler[" + schedulerExceptionEnum.getCode() + "] " + schedulerExceptionEnum.getMessage());
}

public SchedulerException(String message, Throwable cause) {
super(message, cause);
public SchedulerException(SchedulerExceptionEnum schedulerExceptionEnum, Throwable cause) {
super("Scheduler[" + schedulerExceptionEnum.getCode() + "] " + schedulerExceptionEnum.getMessage(), cause);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.scheduler.api;

public interface SchedulerExceptionEnum {

String getCode();

String getMessage();

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.scheduler.quartz.utils.QuartzTaskUtils;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.process.ProcessService;

Expand All @@ -33,7 +32,6 @@

import lombok.extern.slf4j.Slf4j;

import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobKey;
import org.quartz.Scheduler;
Expand All @@ -56,10 +54,9 @@ public class ProcessScheduleTask extends QuartzJobBean {
@Timed(value = "ds.master.quartz.job.execution.time", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
@Override
protected void executeInternal(JobExecutionContext context) {
JobDataMap dataMap = context.getJobDetail().getJobDataMap();

int projectId = dataMap.getInt(QuartzTaskUtils.PROJECT_ID);
int scheduleId = dataMap.getInt(QuartzTaskUtils.SCHEDULE_ID);
QuartzJobData quartzJobData = QuartzJobData.of(context.getJobDetail().getJobDataMap());
int projectId = quartzJobData.getProjectId();
int scheduleId = quartzJobData.getScheduleId();

Date scheduledFireTime = context.getScheduledFireTime();

Expand Down Expand Up @@ -110,7 +107,7 @@ protected void executeInternal(JobExecutionContext context) {

private void deleteJob(JobExecutionContext context, int projectId, int scheduleId) {
final Scheduler scheduler = context.getScheduler();
JobKey jobKey = QuartzTaskUtils.getJobKey(scheduleId, projectId);
JobKey jobKey = QuartzJobKey.of(projectId, scheduleId).toJobKey();
try {
if (scheduler.checkExists(jobKey)) {
log.info("Try to delete job: {}, projectId: {}, schedulerId", projectId, scheduleId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.scheduler.quartz;

import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.Schedule;

import java.util.Date;

import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.JobKey;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;

/**
* QuartzCornTriggerBuilder used to build a {@link CronTrigger} instance.
*/
public class QuartzCornTriggerBuilder implements QuartzTriggerBuilder {

private Integer projectId;

private Schedule schedule;

public static QuartzCornTriggerBuilder newBuilder() {
return new QuartzCornTriggerBuilder();
}

public QuartzCornTriggerBuilder withProjectId(Integer projectId) {
this.projectId = projectId;
return this;
}

public QuartzCornTriggerBuilder withSchedule(Schedule schedule) {
this.schedule = schedule;
return this;
}

@Override
public CronTrigger build() {

if (projectId == null) {
throw new IllegalArgumentException("projectId cannot be null");
}
if (schedule == null) {
throw new IllegalArgumentException("schedule cannot be null");
}

/**
* transform from server default timezone to schedule timezone
* e.g. server default timezone is `UTC`
* user set a schedule with startTime `2022-04-28 10:00:00`, timezone is `Asia/Shanghai`,
* api skip to transform it and save into databases directly, startTime `2022-04-28 10:00:00`, timezone is `UTC`, which actually added 8 hours,
* so when add job to quartz, it should recover by transform timezone
*/
Date startDate = DateUtils.transformTimezoneDate(schedule.getStartTime(), schedule.getTimezoneId());
Date endDate = DateUtils.transformTimezoneDate(schedule.getEndTime(), schedule.getTimezoneId());
/**
* If the start time is less than the current time, the start time is set to the current time.
* We do this change to avoid misfires all triggers when update the scheduler.
*/
Date now = new Date();
if (startDate.before(now)) {
startDate = now;
}
JobKey jobKey = QuartzJobKey.of(projectId, schedule.getId()).toJobKey();

TriggerKey triggerKey = TriggerKey.triggerKey(jobKey.getName(), jobKey.getGroup());
return TriggerBuilder.newTrigger()
.withIdentity(triggerKey)
.startAt(startDate)
.endAt(endDate)
.withSchedule(
CronScheduleBuilder.cronSchedule(schedule.getCrontab())
.withMisfireHandlingInstructionIgnoreMisfires()
.inTimeZone(DateUtils.getTimezone(schedule.getTimezoneId())))
.build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.scheduler.quartz;

import lombok.Getter;

import org.quartz.JobDataMap;

@Getter
public class QuartzJobData {

private static final String PROJECT_ID = "projectId";
private static final String SCHEDULE_ID = "scheduleId";

private final Integer projectId;

private final Integer scheduleId;

private QuartzJobData(Integer projectId, Integer scheduleId) {
if (projectId == null) {
throw new IllegalArgumentException("projectId cannot be null");
}
if (scheduleId == null) {
throw new IllegalArgumentException("schedule cannot be null");
}
this.projectId = projectId;
this.scheduleId = scheduleId;
}

public static QuartzJobData of(Integer projectId, Integer scheduleId) {
return new QuartzJobData(projectId, scheduleId);
}

public static QuartzJobData of(JobDataMap jobDataMap) {
Integer projectId = jobDataMap.getInt(PROJECT_ID);
Integer scheduleId = jobDataMap.getInt(SCHEDULE_ID);
return of(projectId, scheduleId);
}

public JobDataMap toJobDataMap() {
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put(PROJECT_ID, projectId);
jobDataMap.put(SCHEDULE_ID, scheduleId);
return jobDataMap;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.scheduler.quartz;

import org.quartz.JobBuilder;
import org.quartz.JobDetail;

public class QuartzJobDetailBuilder {

private Integer projectId;

private Integer scheduleId;

public static QuartzJobDetailBuilder newBuilder() {
return new QuartzJobDetailBuilder();
}

public QuartzJobDetailBuilder withProjectId(Integer projectId) {
this.projectId = projectId;
return this;
}

public QuartzJobDetailBuilder withSchedule(Integer scheduleId) {
this.scheduleId = scheduleId;
return this;
}

public JobDetail build() {
if (projectId == null) {
throw new IllegalArgumentException("projectId cannot be null");
}
if (scheduleId == null) {
throw new IllegalArgumentException("scheduleId cannot be null");
}
QuartzJobData quartzJobData = QuartzJobData.of(projectId, scheduleId);

return JobBuilder.newJob(ProcessScheduleTask.class)
.withIdentity(QuartzJobKey.of(projectId, scheduleId).toJobKey())
.setJobData(quartzJobData.toJobDataMap())
.build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.scheduler.quartz;

import lombok.Getter;

import org.quartz.JobKey;

@Getter
public class QuartzJobKey {

private final int schedulerId;
private final int projectId;

private static final String QUARTZ_JOB_PREFIX = "job";
private static final String QUARTZ_JOB_GROUP_PREFIX = "jobgroup";
private static final String UNDERLINE = "_";

private QuartzJobKey(int projectId, int schedulerId) {
this.schedulerId = schedulerId;
this.projectId = projectId;
}

public static QuartzJobKey of(int projectId, int schedulerId) {
return new QuartzJobKey(projectId, schedulerId);
}

public JobKey toJobKey() {
// todo: We don't need to add prefix to job name and job group?
String jobName = QUARTZ_JOB_PREFIX + UNDERLINE + schedulerId;
String jobGroup = QUARTZ_JOB_GROUP_PREFIX + UNDERLINE + projectId;
return new JobKey(jobName, jobGroup);
}
}

0 comments on commit 0419543

Please sign in to comment.