Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ protected static long getNextJobId() {
return System.nanoTime() + RandomUtils.nextInt();
}

@Override
public boolean isJobRunning() {
return JobStatus.RUNNING.equals(getJobStatus());
}

@Override
public void cancelTaskById(long taskId) throws JobException {
if (CollectionUtils.isEmpty(runningTasks)) {
Expand Down Expand Up @@ -484,8 +489,4 @@ public void onReplayEnd(AbstractJob<?, C> replayJob) throws JobException {
public boolean needPersist() {
return true;
}

public boolean isFinalStatus() {
return jobStatus.equals(JobStatus.STOPPED) || jobStatus.equals(JobStatus.FINISHED);
}
}
7 changes: 7 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ public interface Job<T extends AbstractTask, C> {
*/
boolean isReadyForScheduling(C taskContext);

/**
* Checks if the job is running.
*
* @return True if the job is runnning.
*/
boolean isJobRunning();

/**
* Retrieves the metadata for the job, which is used to display job information.
*
Expand Down
30 changes: 0 additions & 30 deletions fe/fe-core/src/main/java/org/apache/doris/job/common/JobUtils.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.JobUtils;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.disruptor.TimerJobEvent;
import org.apache.doris.job.task.AbstractTask;
Expand Down Expand Up @@ -55,7 +54,7 @@ public void onEvent(TimerJobEvent<T> event) {
log.info("job is null,may be job is deleted, ignore");
return;
}
if (event.getJob().isReadyForScheduling(null) && JobUtils.checkNeedSchedule(event.getJob())) {
if (event.getJob().isReadyForScheduling(null) && event.getJob().isJobRunning()) {
List<? extends AbstractTask> tasks = event.getJob().commonCreateTasks(TaskType.SCHEDULED, null);
if (CollectionUtils.isEmpty(tasks)) {
log.warn("job is ready for scheduling, but create task is empty, skip scheduler,"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.doris.job.executor;

import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.common.JobUtils;
import org.apache.doris.job.disruptor.TaskDisruptor;

import io.netty.util.Timeout;
Expand All @@ -40,7 +39,7 @@ public TimerJobSchedulerTask(TaskDisruptor dispatchDisruptor, T job) {
@Override
public void run(Timeout timeout) {
try {
if (!JobUtils.checkNeedSchedule(job)) {
if (!job.isJobRunning()) {
log.info("job status is not running, job id is {}, skip dispatch", this.job.getJobId());
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl
.add(new Column("Comment", ScalarType.createStringType()))
// only execute type = streaming need record
.add(new Column("Properties", ScalarType.createStringType()))
.add(new Column("ConsumedOffset", ScalarType.createStringType()))
.add(new Column("MaxOffset", ScalarType.createStringType()))
.add(new Column("CurrentOffset", ScalarType.createStringType()))
.add(new Column("EndOffset", ScalarType.createStringType()))
.add(new Column("LoadStatistic", ScalarType.createStringType()))
.add(new Column("ErrorMsg", ScalarType.createStringType()))
.build();
Expand All @@ -114,15 +114,12 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl
.addColumn(new Column("EtlInfo", ScalarType.createVarchar(100)))
.addColumn(new Column("TaskInfo", ScalarType.createVarchar(100)))
.addColumn(new Column("ErrorMsg", ScalarType.createVarchar(100)))

.addColumn(new Column("CreateTimeMs", ScalarType.createVarchar(20)))
.addColumn(new Column("FinishTimeMs", ScalarType.createVarchar(20)))
.addColumn(new Column("TrackingUrl", ScalarType.createVarchar(200)))
.addColumn(new Column("LoadStatistic", ScalarType.createVarchar(200)))
.addColumn(new Column("User", ScalarType.createVarchar(50)))
.addColumn(new Column("FirstErrorMsg", ScalarType.createVarchar(200)))
// only execute type = streaming need record
.addColumn(new Column("Offset", ScalarType.createStringType()))
.build();

public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ public class InsertTask extends AbstractTask {
new Column("TrackingUrl", ScalarType.createStringType()),
new Column("LoadStatistic", ScalarType.createStringType()),
new Column("User", ScalarType.createStringType()),
new Column("FirstErrorMsg", ScalarType.createStringType()),
new Column("Offset", ScalarType.createStringType()));
new Column("FirstErrorMsg", ScalarType.createStringType()));

public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;

Expand Down Expand Up @@ -277,7 +276,6 @@ public TRow getTvfInfo(String jobName) {
trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser()));
}
trow.addToColumnValue(new TCell().setStringVal(firstErrorMsg == null ? "" : firstErrorMsg));
trow.addToColumnValue(new TCell().setStringVal(""));
return trow;
}

Expand All @@ -299,7 +297,6 @@ private TRow getPendingTaskTVFInfo(String jobName) {
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser()));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(""));
return trow;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import lombok.Setter;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.io.DataOutput;
import java.io.IOException;
Expand Down Expand Up @@ -203,7 +204,16 @@ protected void checkJobParamsInternal() {

@Override
public boolean isReadyForScheduling(Map<Object, Object> taskContext) {
return CollectionUtils.isEmpty(getRunningTasks());
return CollectionUtils.isEmpty(getRunningTasks()) && !isFinalStatus();
}

@Override
public boolean isJobRunning() {
return !isFinalStatus();
}

private boolean isFinalStatus() {
return getJobStatus().equals(JobStatus.STOPPED) || getJobStatus().equals(JobStatus.FINISHED);
}

@Override
Expand Down Expand Up @@ -354,14 +364,14 @@ public TRow getTvfInfo() {
trow.addToColumnValue(new TCell().setStringVal(properties != null
? GsonUtils.GSON.toJson(properties) : FeConstants.null_string));

if (offsetProvider != null && offsetProvider.getConsumedOffset() != null) {
trow.addToColumnValue(new TCell().setStringVal(offsetProvider.getConsumedOffset()));
if (offsetProvider != null && StringUtils.isNotEmpty(offsetProvider.getShowCurrentOffset())) {
trow.addToColumnValue(new TCell().setStringVal(offsetProvider.getShowCurrentOffset()));
} else {
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
}

if (offsetProvider != null && offsetProvider.getMaxOffset() != null) {
trow.addToColumnValue(new TCell().setStringVal(offsetProvider.getMaxOffset()));
if (offsetProvider != null && StringUtils.isNotEmpty(offsetProvider.getShowMaxOffset())) {
trow.addToColumnValue(new TCell().setStringVal(offsetProvider.getShowMaxOffset()));
} else {
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,6 @@ public TRow getTvfInfo(String jobName) {
trow.addToColumnValue(new TCell().setStringVal(runningTask.getUserIdentity().getQualifiedUser()));
}
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(runningTask.getRunningOffset() == null ? FeConstants.null_string
: runningTask.getRunningOffset().toString()));
return trow;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,16 @@ public interface SourceOffsetProvider {
Offset getNextOffset(StreamingJobProperties jobProps, Map<String, String> properties);

/**
* Get consumered offset to show
* Get current offset to show
* @return
*/
String getConsumedOffset();
String getShowCurrentOffset();

/**
* Get remote datasource max offset
* Get remote datasource max offset to show
* @return
*/
String getMaxOffset();
String getShowMaxOffset();

/**
* Rewrite the TVF parameters in the SQL based on the current offset.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
@Getter
@Setter
public class S3Offset implements Offset {
// path/1.csv
String startFile;
@SerializedName("endFile")
String endFile;
// s3://bucket/path/{1.csv,2.csv}
Expand All @@ -47,6 +45,6 @@ public boolean isEmpty() {

@Override
public String toString() {
return "{ \"startFile\": \"" + startFile + "\", \"endFile\": \"" + endFile + "\" }";
return "{\"endFile\": \"" + endFile + "\" }";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.doris.persist.gson.GsonUtils;

import com.google.common.collect.Maps;
import com.google.gson.Gson;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -72,7 +73,6 @@ public S3Offset getNextOffset(StreamingJobProperties jobProps, Map<String, Strin
String bucket = globListResult.getBucket();
String prefix = globListResult.getPrefix();

offset.setStartFile(startFile);
String bucketBase = "s3://" + bucket + "/";
// Get the path of the last directory
int lastSlash = prefix.lastIndexOf('/');
Expand Down Expand Up @@ -109,16 +109,18 @@ public S3Offset getNextOffset(StreamingJobProperties jobProps, Map<String, Strin
}

@Override
public String getConsumedOffset() {
public String getShowCurrentOffset() {
if (currentOffset != null) {
return currentOffset.getEndFile();
return currentOffset.toSerializedJson();
}
return null;
}

@Override
public String getMaxOffset() {
return maxEndFile;
public String getShowMaxOffset() {
Map<String, String> res = new HashMap<>();
res.put("endFile", maxEndFile);
return new Gson().toJson(res);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobUtils;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.disruptor.TaskDisruptor;
import org.apache.doris.job.exception.JobException;
Expand Down Expand Up @@ -110,7 +109,7 @@ private void batchSchedulerTimerJob() {
}

public void scheduleOneJob(T job) throws JobException {
if (!JobUtils.checkNeedSchedule(job)) {
if (!job.isJobRunning()) {
return;
}
// not-schedule task
Expand Down Expand Up @@ -145,7 +144,7 @@ public void scheduleOneJob(T job) throws JobException {
}

public void cycleTimerJobScheduler(T job) {
if (!JobUtils.checkNeedSchedule(job)) {
if (!job.isJobRunning()) {
return;
}
if (!JobExecuteType.RECURRING.equals(job.getJobConfig().getExecuteType())) {
Expand Down Expand Up @@ -226,7 +225,7 @@ private void executeTimerJobIdsWithinLastTenMinutesWindow() {
clearEndJob(job);
continue;
}
if (JobUtils.checkNeedSchedule(job) && job.getJobConfig().checkIsTimerJob()) {
if (job.isJobRunning() && job.getJobConfig().checkIsTimerJob()) {
cycleTimerJobScheduler(job, lastTimeWindowMs);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ suite("test_streaming_insert_job") {


def jobOffset = sql """
select ConsumedOffset, MaxOffset from jobs("type"="insert") where Name='${jobName}'
select currentOffset, endoffset from jobs("type"="insert") where Name='${jobName}'
"""
assert jobOffset.get(0).get(0) == "regression/load/data/example_1.csv"
assert jobOffset.get(0).get(1) == "regression/load/data/example_1.csv"
assert jobOffset.get(0).get(0) == "{\"endFile\":\"regression/load/data/example_1.csv\"}";
assert jobOffset.get(0).get(1) == "{\"endFile\":\"regression/load/data/example_1.csv\"}";

// alter streaming job
sql """
Expand All @@ -123,21 +123,21 @@ suite("test_streaming_insert_job") {
"""

def alterJobProperties = sql """
select status,properties,ConsumedOffset from jobs("type"="insert") where Name='${jobName}'
select status,properties,currentOffset from jobs("type"="insert") where Name='${jobName}'
"""
assert alterJobProperties.get(0).get(0) == "PAUSED"
assert alterJobProperties.get(0).get(1) == "{\"s3.max_batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
assert alterJobProperties.get(0).get(2) == "regression/load/data/example_1.csv"
assert alterJobProperties.get(0).get(2) == "{\"endFile\":\"regression/load/data/example_1.csv\"}";

sql """
RESUME JOB where jobname = '${jobName}'
"""
def resumeJobStatus = sql """
select status,properties,ConsumedOffset from jobs("type"="insert") where Name='${jobName}'
select status,properties,currentOffset from jobs("type"="insert") where Name='${jobName}'
"""
assert resumeJobStatus.get(0).get(0) == "RUNNING" || resumeJobStatus.get(0).get(0) == "PENDING"
assert resumeJobStatus.get(0).get(1) == "{\"s3.max_batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
assert resumeJobStatus.get(0).get(2) == "regression/load/data/example_1.csv"
assert resumeJobStatus.get(0).get(2) == "{\"endFile\":\"regression/load/data/example_1.csv\"}";

Awaitility.await().atMost(60, SECONDS)
.pollInterval(1, SECONDS).until(
Expand Down