Skip to content

Commit

Permalink
Add PipelineJobIdUtils.getElasticJobConfigurationPOJO() (#29036)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 14, 2023
1 parent 0cbc2da commit 13bbf05
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import org.apache.shardingsphere.data.pipeline.common.util.InstanceTypeUtils;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;

import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -86,4 +90,16 @@ public static PipelineContextKey parseContextKey(final String jobId) {
String databaseName = new String(Hex.decodeHex(jobId.substring(10, 10 + databaseNameLength)), StandardCharsets.UTF_8);
return new PipelineContextKey(databaseName, InstanceTypeUtils.decode(instanceType));
}

/**
* Get ElasticJob configuration POJO.
*
* @param jobId job id
* @return ElasticJob configuration POJO
*/
public static JobConfigurationPOJO getElasticJobConfigurationPOJO(final String jobId) {
JobConfigurationPOJO result = PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobConfiguration(jobId);
ShardingSpherePreconditions.checkNotNull(result, () -> new PipelineJobNotFoundException(jobId));
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;

Expand Down Expand Up @@ -95,6 +96,14 @@ public interface PipelineJobAPI extends TypedSPI {
*/
PipelineJobConfiguration getJobConfiguration(String jobId);

/**
* Get job configuration.
*
* @param jobConfigPOJO job configuration POJO
* @return pipeline job configuration
*/
PipelineJobConfiguration getJobConfiguration(JobConfigurationPOJO jobConfigPOJO);

/**
* Get pipeline job info.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public PipelineProcessConfiguration showProcessConfiguration(final PipelineConte
@Override
public Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(final PipelineJobConfiguration jobConfig) {
String jobId = jobConfig.getJobId();
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
return IntStream.range(0, jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, each) -> {
Optional<InventoryIncrementalJobItemProgress> jobItemProgress = getJobItemProgress(jobId, each);
jobItemProgress.ifPresent(optional -> optional.setActive(!jobConfigPOJO.isDisabled()));
Expand All @@ -101,7 +101,7 @@ public Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(final Pi

@Override
public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
PipelineJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
long startTimeMillis = Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0"));
Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = getJobProgress(jobConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
Expand Down Expand Up @@ -62,13 +61,11 @@ public Optional<String> start(final PipelineJobConfiguration jobConfig) {
return Optional.of(jobId);
}

protected abstract PipelineJobConfiguration getJobConfiguration(JobConfigurationPOJO jobConfigPOJO);

@Override
public void startDisabledJob(final String jobId) {
PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierDisablePath(jobId));
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () -> new PipelineJobHasAlreadyStartedException(jobId));
jobConfigPOJO.setDisabled(false);
jobConfigPOJO.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis()));
Expand All @@ -85,7 +82,7 @@ public void startDisabledJob(final String jobId) {
public void stop(final String jobId) {
PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
if (jobConfigPOJO.isDisabled()) {
return;
}
Expand All @@ -104,18 +101,6 @@ protected void dropJob(final String jobId) {
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).deleteJob(jobId);
}

/**
* Get ElasticJob configuration POJO.
*
* @param jobId job id
* @return ElasticJob configuration POJO
*/
public final JobConfigurationPOJO getElasticJobConfigPOJO(final String jobId) {
JobConfigurationPOJO result = PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobConfiguration(jobId);
ShardingSpherePreconditions.checkNotNull(result, () -> new PipelineJobNotFoundException(jobId));
return result;
}

@Override
public String getJobItemErrorMessage(final String jobId, final int shardingItem) {
return Optional.ofNullable(PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemErrorMessage(jobId, shardingItem)).orElse("");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public void startJob(final String jobId, final PipelineSink sink) {
CDCJob job = new CDCJob(jobId, sink);
PipelineJobCenter.addJob(jobId, job);
updateJobConfigurationDisabled(jobId, false);
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)), job, jobConfigPOJO.toJobConfiguration());
job.setJobBootstrap(oneOffJobBootstrap);
oneOffJobBootstrap.execute();
Expand All @@ -216,7 +216,7 @@ public void startJob(final String jobId, final PipelineSink sink) {
* @param disabled disabled
*/
public void updateJobConfigurationDisabled(final String jobId, final boolean disabled) {
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
jobConfigPOJO.setDisabled(disabled);
if (disabled) {
jobConfigPOJO.getProps().setProperty("stop_time_millis", String.valueOf(System.currentTimeMillis()));
Expand Down Expand Up @@ -279,17 +279,17 @@ public CDCProcessContext buildPipelineProcessContext(final PipelineJobConfigurat

@Override
public CDCJobConfiguration getJobConfiguration(final String jobId) {
return getJobConfiguration(getElasticJobConfigPOJO(jobId));
return getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
}

@Override
protected CDCJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
public CDCJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
return new YamlCDCJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
}

@Override
public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
PipelineJobMetaData jobMetaData = new PipelineJobMetaData(jobConfigPOJO);
CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
return new TableBasedPipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), String.join(", ", jobConfig.getSchemaTableNames()));
Expand All @@ -305,7 +305,7 @@ public void commit(final String jobId) {
* @param jobId job id
*/
public void dropStreaming(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () -> new PipelineInternalException("Can't drop streaming job which is active"));
dropJob(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ private ConsistencyCheckJobItemInfo getJobItemInfo(final String parentJobId) {
String checkJobId = latestCheckJobId.get();
Optional<ConsistencyCheckJobItemProgress> progress = getJobItemProgress(checkJobId, 0);
ConsistencyCheckJobItemInfo result = new ConsistencyCheckJobItemInfo();
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(checkJobId);
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId);
result.setActive(!jobConfigPOJO.isDisabled());
if (!progress.isPresent()) {
return result;
Expand Down Expand Up @@ -357,11 +357,11 @@ private void fillInJobItemInfoWithCheckResult(final ConsistencyCheckJobItemInfo

@Override
public ConsistencyCheckJobConfiguration getJobConfiguration(final String jobId) {
return getJobConfiguration(getElasticJobConfigPOJO(jobId));
return getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
}

@Override
protected ConsistencyCheckJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
public ConsistencyCheckJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
return new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ private Map<String, String> buildTargetTableSchemaMap(final Map<String, List<Dat

@Override
public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
PipelineJobMetaData jobMetaData = new PipelineJobMetaData(jobConfigPOJO);
List<String> sourceTables = new LinkedList<>();
getJobConfiguration(jobConfigPOJO).getJobShardingDataNodes().forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes()
Expand All @@ -228,11 +228,11 @@ public void extendYamlJobConfiguration(final PipelineContextKey contextKey, fina

@Override
public MigrationJobConfiguration getJobConfiguration(final String jobId) {
return getJobConfiguration(getElasticJobConfigPOJO(jobId));
return getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
}

@Override
protected MigrationJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
public MigrationJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
return new YamlMigrationJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
}

Expand Down

0 comments on commit 13bbf05

Please sign in to comment.