Skip to content

Commit

Permalink
Refactor ConsistencyCheckJobOption (#29208)
Browse files Browse the repository at this point in the history
* Refactor PipelineJobConfigurationManager

* Refactor MigrationJobAPI

* Remove SPI from PipelineJobOption

* Remove SPI from PipelineJobOption

* Refactor PipelineJobManager

* Refactor PipelineJobManager

* Refactor ConsistencyCheckJobOption

* Refactor ConsistencyCheckJobOption

* Refactor ConsistencyCheckJobOptionTest

* Refactor ConsistencyCheckJobOptionTest
  • Loading branch information
terrymanu committed Nov 25, 2023
1 parent bf41095 commit a0b025c
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.shardingsphere.data.pipeline.core.exception.job;

import org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
import org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;

Expand All @@ -27,7 +28,7 @@ public final class UncompletedConsistencyCheckJobExistsException extends Pipelin

private static final long serialVersionUID = 2854259384634892428L;

public UncompletedConsistencyCheckJobExistsException(final String jobId) {
super(XOpenSQLState.GENERAL_ERROR, 96, String.format("Uncompleted consistency check job `%s` exists.", jobId));
public UncompletedConsistencyCheckJobExistsException(final String jobId, final ConsistencyCheckJobItemProgress progress) {
super(XOpenSQLState.GENERAL_ERROR, 96, String.format("Uncompleted consistency check job `%s` exists, progress `%s`", jobId, progress));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl;

import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.ConsistencyCheckJobNotFoundException;
Expand Down Expand Up @@ -65,50 +65,52 @@
/**
* Consistency check job option.
*/
@Slf4j
public final class ConsistencyCheckJobOption implements PipelineJobOption {

/**
* Create consistency check configuration and start job.
*
* @param param create consistency check job parameter
* @return job id
* @throws UncompletedConsistencyCheckJobExistsException uncompleted consistency check job exists exception
*/
public String createJobAndStart(final CreateConsistencyCheckJobParameter param) {
String parentJobId = param.getParentJobId();
PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId));
Optional<String> latestCheckJobId = governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
if (latestCheckJobId.isPresent()) {
PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
Optional<ConsistencyCheckJobItemProgress> progress = jobItemManager.getProgress(latestCheckJobId.get(), 0);
if (!progress.isPresent() || JobStatus.FINISHED != progress.get().getStatus()) {
log.info("check job already exists and status is not FINISHED, progress={}", progress);
throw new UncompletedConsistencyCheckJobExistsException(latestCheckJobId.get());
}
Optional<ConsistencyCheckJobItemProgress> progress = new PipelineJobItemManager<ConsistencyCheckJobItemProgress>(getYamlJobItemProgressSwapper()).getProgress(latestCheckJobId.get(), 0);
ShardingSpherePreconditions.checkState(progress.isPresent() && JobStatus.FINISHED == progress.get().getStatus(),
() -> new UncompletedConsistencyCheckJobExistsException(latestCheckJobId.get(), progress.orElse(null)));
}
verifyPipelineDatabaseType(param);
checkPipelineDatabaseType(param);
PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(parentJobId);
String result = latestCheckJobId.map(s -> new ConsistencyCheckJobId(contextKey, parentJobId, s)).orElseGet(() -> new ConsistencyCheckJobId(contextKey, parentJobId)).marshal();
String result = latestCheckJobId.map(optional -> new ConsistencyCheckJobId(contextKey, parentJobId, optional)).orElseGet(() -> new ConsistencyCheckJobId(contextKey, parentJobId)).marshal();
governanceFacade.getJobFacade().getCheck().persistLatestCheckJobId(parentJobId, result);
governanceFacade.getJobFacade().getCheck().deleteCheckJobResult(parentJobId, result);
new PipelineJobManager(this).drop(result);
YamlConsistencyCheckJobConfiguration yamlConfig = new YamlConsistencyCheckJobConfiguration();
yamlConfig.setJobId(result);
yamlConfig.setParentJobId(parentJobId);
yamlConfig.setAlgorithmTypeName(param.getAlgorithmTypeName());
yamlConfig.setAlgorithmProps(param.getAlgorithmProps());
yamlConfig.setSourceDatabaseType(param.getSourceDatabaseType().getType());
new PipelineJobManager(this).start(new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(yamlConfig));
new PipelineJobManager(this).start(new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(getYamlConfiguration(result, parentJobId, param)));
return result;
}

private void verifyPipelineDatabaseType(final CreateConsistencyCheckJobParameter param) {
Collection<DatabaseType> supportedDatabaseTypes = TableDataConsistencyCheckerFactory.newInstance(param.getAlgorithmTypeName(), param.getAlgorithmProps()).getSupportedDatabaseTypes();
private void checkPipelineDatabaseType(final CreateConsistencyCheckJobParameter param) {
Collection<DatabaseType> supportedDatabaseTypes;
try (TableDataConsistencyChecker checker = TableDataConsistencyCheckerFactory.newInstance(param.getAlgorithmTypeName(), param.getAlgorithmProps())) {
supportedDatabaseTypes = checker.getSupportedDatabaseTypes();
}
ShardingSpherePreconditions.checkState(supportedDatabaseTypes.contains(param.getSourceDatabaseType()), () -> new UnsupportedPipelineDatabaseTypeException(param.getSourceDatabaseType()));
ShardingSpherePreconditions.checkState(supportedDatabaseTypes.contains(param.getTargetDatabaseType()), () -> new UnsupportedPipelineDatabaseTypeException(param.getTargetDatabaseType()));
}

private YamlConsistencyCheckJobConfiguration getYamlConfiguration(final String jobId, final String parentJobId, final CreateConsistencyCheckJobParameter param) {
YamlConsistencyCheckJobConfiguration result = new YamlConsistencyCheckJobConfiguration();
result.setJobId(jobId);
result.setParentJobId(parentJobId);
result.setAlgorithmTypeName(param.getAlgorithmTypeName());
result.setAlgorithmProps(param.getAlgorithmProps());
result.setSourceDatabaseType(param.getSourceDatabaseType().getType());
return result;
}

@Override
public boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@

class ConsistencyCheckJobOptionTest {

private final ConsistencyCheckJobOption jobAPI = new ConsistencyCheckJobOption();
private final ConsistencyCheckJobOption jobOption = new ConsistencyCheckJobOption();

private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());

private final YamlMigrationJobConfigurationSwapper jobConfigSwapper = new YamlMigrationJobConfigurationSwapper();

Expand All @@ -67,9 +67,9 @@ public static void beforeClass() {
void assertCreateJobConfig() {
MigrationJobConfiguration parentJobConfig = jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration());
String parentJobId = parentJobConfig.getJobId();
String checkJobId = jobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null,
String checkJobId = jobOption.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null,
parentJobConfig.getSourceDatabaseType(), parentJobConfig.getTargetDatabaseType()));
ConsistencyCheckJobConfiguration checkJobConfig = new PipelineJobConfigurationManager(jobAPI).getJobConfiguration(checkJobId);
ConsistencyCheckJobConfiguration checkJobConfig = new PipelineJobConfigurationManager(jobOption).getJobConfiguration(checkJobId);
int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
String expectCheckJobId = new ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), parentJobId, expectedSequence).marshal();
assertThat(checkJobConfig.getJobId(), is(expectCheckJobId));
Expand All @@ -85,7 +85,7 @@ void assertDropByParentJobId() {
PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
int expectedSequence = 1;
for (int i = 0; i < 3; i++) {
String checkJobId = jobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null,
String checkJobId = jobOption.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null,
parentJobConfig.getSourceDatabaseType(), parentJobConfig.getTargetDatabaseType()));
ConsistencyCheckJobItemContext checkJobItemContext = new ConsistencyCheckJobItemContext(
new ConsistencyCheckJobConfiguration(checkJobId, parentJobId, null, null, TypedSPILoader.getService(DatabaseType.class, "H2")), 0, JobStatus.FINISHED, null);
Expand All @@ -98,12 +98,12 @@ void assertDropByParentJobId() {
}
expectedSequence = 2;
for (int i = 0; i < 2; i++) {
jobAPI.dropByParentJobId(parentJobId);
jobOption.dropByParentJobId(parentJobId);
Optional<String> latestCheckJobId = governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
assertTrue(latestCheckJobId.isPresent());
assertThat(ConsistencyCheckJobId.parseSequence(latestCheckJobId.get()), is(expectedSequence--));
}
jobAPI.dropByParentJobId(parentJobId);
jobOption.dropByParentJobId(parentJobId);
Optional<String> latestCheckJobId = governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
assertFalse(latestCheckJobId.isPresent());
}
Expand Down

0 comments on commit a0b025c

Please sign in to comment.